You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2023/05/08 14:25:03 UTC

[kafka] branch trunk updated: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito (#13621)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b98f8553ba KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito (#13621)
2b98f8553ba is described below

commit 2b98f8553ba12ab5d8cb88f5cd0d1198cb424df6
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Mon May 8 15:24:52 2023 +0100

    KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito (#13621)
    
    Migrates ChangeLogReader mock in TaskManagerTest to mockito.
    
    Reviewer: Bruno Cadonna <ca...@apache.org>
---
 .../processor/internals/TaskManagerTest.java       | 250 ++++++++++-----------
 1 file changed, 116 insertions(+), 134 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 6072c96ca6e..48d046e1b90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -184,7 +184,7 @@ public class TaskManagerTest {
     private InternalTopologyBuilder topologyBuilder;
     @Mock(type = MockType.DEFAULT)
     private StateDirectory stateDirectory;
-    @Mock(type = MockType.NICE)
+    @org.mockito.Mock
     private ChangelogReader changeLogReader;
     @Mock(type = MockType.STRICT)
     private Consumer<byte[], byte[]> consumer;
@@ -1826,7 +1826,7 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(emptyMap()))).andStubReturn(emptyList());
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
@@ -1841,7 +1841,7 @@ public class TaskManagerTest {
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -1866,14 +1866,14 @@ public class TaskManagerTest {
         };
 
         // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(emptyMap()))).andStubReturn(emptyList());
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
         expectLastCall();
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         taskManager.handleRevocation(taskId00Partitions);
@@ -1897,7 +1897,7 @@ public class TaskManagerTest {
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
@@ -1916,7 +1916,7 @@ public class TaskManagerTest {
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -1959,7 +1959,7 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
 
@@ -1969,7 +1969,7 @@ public class TaskManagerTest {
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
         expectLastCall().andThrow(new RuntimeException("KABOOM!"));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2009,11 +2009,11 @@ public class TaskManagerTest {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2047,11 +2047,11 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2086,12 +2086,13 @@ public class TaskManagerTest {
             .andStubReturn(asList(corruptedTask, nonCorruptedTask));
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(consumer.assignment()).andReturn(taskId00Partitions);
         // check that we should not commit empty map either
         consumer.commitSync(eq(emptyMap()));
         expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty"));
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2129,7 +2130,7 @@ public class TaskManagerTest {
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
         expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2163,9 +2164,9 @@ public class TaskManagerTest {
         expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2204,11 +2205,11 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateDirectory, stateManager);
+        replay(activeTaskCreator, standbyTaskCreator, consumer, stateDirectory, stateManager);
 
         uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
 
@@ -2256,14 +2257,14 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         consumer.commitSync(offsets);
         expectLastCall().andThrow(new TimeoutException());
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2332,7 +2333,7 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
         expect(consumer.groupMetadata()).andReturn(groupMetadata);
@@ -2341,7 +2342,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateManager);
+        replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2409,7 +2410,7 @@ public class TaskManagerTest {
             mkEntry(taskId02, taskId02Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
@@ -2419,7 +2420,7 @@ public class TaskManagerTest {
         expectLastCall().andThrow(new TimeoutException());
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2473,7 +2474,7 @@ public class TaskManagerTest {
             mkEntry(taskId02, taskId02Partitions)
             );
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
@@ -2487,7 +2488,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateManager);
+        replay(activeTaskCreator, standbyTaskCreator, consumer, stateManager);
 
         taskManager.handleAssignment(assignmentActive, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2512,13 +2513,13 @@ public class TaskManagerTest {
     public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andStubReturn(Collections.emptySet());
         consumer.commitSync(Collections.emptyMap());
         expectLastCall();
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2535,14 +2536,14 @@ public class TaskManagerTest {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
         final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         // expect these calls twice (because we're going to tryToCompleteRestoration twice)
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01)).anyTimes();
         expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2561,13 +2562,13 @@ public class TaskManagerTest {
     public void shouldUpdateInputPartitionsAfterRebalance() {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         // expect these calls twice (because we're going to tryToCompleteRestoration twice)
-        expectRestoreToBeCompleted(consumer, changeLogReader, false);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2579,7 +2580,7 @@ public class TaskManagerTest {
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertEquals(newPartitionsSet, task00.inputPartitions());
-        verify(activeTaskCreator, consumer, changeLogReader);
+        verify(activeTaskCreator, consumer);
     }
 
     @Test
@@ -2587,15 +2588,12 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
 
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
-        changeLogReader.enforceRestoreActive();
-        expectLastCall();
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(consumer, activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2607,6 +2605,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
+        Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
     @Test
@@ -2630,15 +2629,12 @@ public class TaskManagerTest {
 
         consumer.commitSync(Collections.emptyMap());
         expectLastCall();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
-        changeLogReader.enforceRestoreActive();
-        expectLastCall();
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(task00, task01));
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(consumer, activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2655,6 +2651,7 @@ public class TaskManagerTest {
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
+        Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
     @Test
@@ -2671,15 +2668,13 @@ public class TaskManagerTest {
 
         consumer.commitSync(Collections.emptyMap());
         expectLastCall();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
-        changeLogReader.enforceRestoreActive();
         expectLastCall();
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(consumer, activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -2694,6 +2689,7 @@ public class TaskManagerTest {
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
+        Mockito.verify(changeLogReader).enforceRestoreActive();
     }
 
     @Test
@@ -2702,13 +2698,13 @@ public class TaskManagerTest {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2752,7 +2748,7 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
             mkEntry(taskId10, taskId10Partitions)
         );
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
             .andReturn(asList(task00, task01, task02));
@@ -2776,7 +2772,7 @@ public class TaskManagerTest {
         task10.committedOffsets();
         EasyMock.expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2824,7 +2820,7 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignmentStandby = mkMap(
             mkEntry(taskId10, taskId10Partitions)
         );
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
             .andReturn(asList(task00, task01, task02));
@@ -2835,7 +2831,7 @@ public class TaskManagerTest {
         consumer.commitSync(expectedCommittedOffsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2866,14 +2862,14 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignmentActive = singletonMap(taskId00, taskId00Partitions);
         final Map<TaskId, Set<TopicPartition>> assignmentStandby = singletonMap(taskId10, taskId10Partitions);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(singleton(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
         expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2898,14 +2894,14 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignmentActive = singletonMap(taskId00, taskId00Partitions);
         final Map<TaskId, Set<TopicPartition>> assignmentStandby = singletonMap(taskId10, taskId10Partitions);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(singleton(task00));
         expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
         expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2926,7 +2922,7 @@ public class TaskManagerTest {
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
         expectLastCall().once();
         expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -2948,10 +2944,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
@@ -3020,9 +3016,6 @@ public class TaskManagerTest {
             }
         };
 
-        resetToStrict(changeLogReader);
-        changeLogReader.enforceRestoreActive();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
             .andStubReturn(asList(task00, task01, task02, task03));
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
@@ -3030,7 +3023,7 @@ public class TaskManagerTest {
         activeTaskCreator.closeThreadProducerIfNeeded();
         expectLastCall();
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3057,6 +3050,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3074,7 +3069,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        verify(activeTaskCreator, changeLogReader);
+        verify(activeTaskCreator);
     }
 
     @Test
@@ -3092,16 +3087,13 @@ public class TaskManagerTest {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        resetToStrict(changeLogReader);
-        changeLogReader.enforceRestoreActive();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
         expectLastCall().andThrow(new RuntimeException("whatever"));
         activeTaskCreator.closeThreadProducerIfNeeded();
         expectLastCall();
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3119,6 +3111,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
 
@@ -3127,7 +3121,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        verify(activeTaskCreator, changeLogReader);
+        verify(activeTaskCreator);
     }
 
     @Test
@@ -3143,16 +3137,13 @@ public class TaskManagerTest {
             }
         };
 
-        resetToStrict(changeLogReader);
-        changeLogReader.enforceRestoreActive();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
         expectLastCall();
         activeTaskCreator.closeThreadProducerIfNeeded();
         expectLastCall().andThrow(new RuntimeException("whatever"));
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3170,6 +3161,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
 
@@ -3178,7 +3171,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        verify(activeTaskCreator, changeLogReader);
+        verify(activeTaskCreator);
     }
 
     @Test
@@ -3272,16 +3265,13 @@ public class TaskManagerTest {
             }
         };
 
-        resetToStrict(changeLogReader);
-        changeLogReader.enforceRestoreActive();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(task00, task01, task02));
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
         expectLastCall().andThrow(new RuntimeException("whatever")).times(3);
         activeTaskCreator.closeThreadProducerIfNeeded();
         expectLastCall().andThrow(new RuntimeException("whatever all"));
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3305,6 +3295,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(changeLogReader).completedChangelogs();
 
         taskManager.shutdown(false);
 
@@ -3314,7 +3306,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        verify(activeTaskCreator, changeLogReader);
+        verify(activeTaskCreator);
     }
 
     @Test
@@ -3327,7 +3319,6 @@ public class TaskManagerTest {
         expect(standbyTaskCreator.createTasks(eq(assignment))).andStubReturn(singletonList(task00));
 
         // `tryToCompleteRestoration`
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(consumer.assignment()).andReturn(emptySet());
         consumer.resume(eq(emptySet()));
         expectLastCall();
@@ -3338,7 +3329,7 @@ public class TaskManagerTest {
         activeTaskCreator.closeThreadProducerIfNeeded();
         expectLastCall();
 
-        replay(consumer, activeTaskCreator, standbyTaskCreator, changeLogReader);
+        replay(consumer, activeTaskCreator, standbyTaskCreator);
 
         taskManager.handleAssignment(emptyMap(), assignment);
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -3436,11 +3427,11 @@ public class TaskManagerTest {
     @Test
     public void shouldInitializeNewActiveTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
             .andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3456,11 +3447,11 @@ public class TaskManagerTest {
     public void shouldInitializeNewStandbyTasks() {
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3492,7 +3483,7 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
             .andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
@@ -3500,7 +3491,7 @@ public class TaskManagerTest {
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3536,7 +3527,7 @@ public class TaskManagerTest {
             mkEntry(taskId05, taskId05Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
             .andStubReturn(Arrays.asList(task00, task01, task02));
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
@@ -3544,7 +3535,7 @@ public class TaskManagerTest {
 
         consumer.commitSync(eq(emptyMap()));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3570,12 +3561,12 @@ public class TaskManagerTest {
     public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3595,13 +3586,13 @@ public class TaskManagerTest {
 
         makeTaskFolders(taskId00.toString(), task01.toString());
         expectLockObtainedFor(taskId00, taskId01);
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
             .andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment)))
             .andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, stateDirectory, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, stateDirectory, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3710,10 +3701,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3736,11 +3727,11 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(emptyMap(), taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3771,11 +3762,11 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3807,10 +3798,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3833,14 +3824,14 @@ public class TaskManagerTest {
     public void shouldIgnorePurgeDataErrors() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
 
         final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
         final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords));
         futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
         expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2);
 
-        replay(activeTaskCreator, adminClient, consumer, changeLogReader);
+        replay(activeTaskCreator, adminClient, consumer);
 
         taskManager.addTask(task00);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3884,7 +3875,7 @@ public class TaskManagerTest {
             mkEntry(taskId10, taskId10Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
             .andStubReturn(asList(task00, task01, task02, task03));
         expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
@@ -3892,7 +3883,7 @@ public class TaskManagerTest {
         consumer.commitSync(expectedCommittedOffsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3928,10 +3919,10 @@ public class TaskManagerTest {
         assignment.put(taskId00, taskId00Partitions);
         assignment.put(taskId01, taskId01Partitions);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(Arrays.asList(task00, task01));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4041,10 +4032,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4066,11 +4057,11 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
             .andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4095,10 +4086,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4117,10 +4108,10 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4144,12 +4135,12 @@ public class TaskManagerTest {
             }
         };
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
             .andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject()))
             .andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4169,10 +4160,9 @@ public class TaskManagerTest {
             }
         };
 
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, changeLogReader, consumer);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
@@ -4188,13 +4178,13 @@ public class TaskManagerTest {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
         expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
             LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
@@ -4351,8 +4341,8 @@ public class TaskManagerTest {
         expect(standbyTaskCreator.createTasks(eq(standbyAssignment))).andStubReturn(standbyTasks);
         expect(activeTaskCreator.createTasks(anyObject(), eq(allActiveTasksAssignment))).andStubReturn(allActiveTasks);
 
-        expectRestoreToBeCompleted(consumer, changeLogReader);
-        replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+        expectRestoreToBeCompleted(consumer);
+        replay(activeTaskCreator, standbyTaskCreator, consumer);
 
         taskManager.handleAssignment(allActiveTasksAssignment, standbyAssignment);
         taskManager.tryToCompleteRestoration(time.milliseconds(), null);
@@ -4671,19 +4661,11 @@ public class TaskManagerTest {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
-                                                   final ChangelogReader changeLogReader) {
-        expectRestoreToBeCompleted(consumer, changeLogReader, true);
-    }
-
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
-                                                   final ChangelogReader changeLogReader,
-                                                   final boolean changeLogUpdateRequired) {
+    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
         expect(consumer.assignment()).andReturn(assignment);
         consumer.resume(assignment);
         expectLastCall();
-        expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).times(changeLogUpdateRequired ? 1 : 0, 1);
     }
 
     private static KafkaFutureImpl<DeletedRecords> completedFuture() {