You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/05/11 13:11:25 UTC

[GitHub] [kafka] clolov commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito

clolov commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1191109013


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
           when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -879,13 +880,10 @@ public void shouldCloseTasksRemovedFromStateUpdater() {
         when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
         when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        replay(activeTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
           Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -960,10 +958,8 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks())
             .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
           when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(task01Converted);
         expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
             .andStubReturn(task00Converted);
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
           Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```
   The default check of verify is times(1) as far as I know.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2162,11 +2140,12 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
 
         // handleAssignment
         expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId01Assignment)))
+                .thenReturn(singleton(runningNonCorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(singleton(runningNonCorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2202,14 +2181,15 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActive, uncorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActive, uncorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2412,15 +2394,15 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));

Review Comment:
   ```suggestion
               .thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2750,11 +2726,10 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3087,13 +3053,11 @@ public Set<TopicPartition> changelogPartitions() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall().andThrow(new RuntimeException("whatever"));
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new RuntimeException("whatever"))
+                .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
               .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3016,14 +2985,10 @@ public void closeDirty() {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(task00, task01, task02, task03));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(4);
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(task00, task01, task02, task03));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02, task03));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2822,16 +2797,14 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() {
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3120,8 +3084,9 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(exception.getCause().getMessage(), is("whatever"));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
           Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3484,14 +3435,14 @@ public void shouldCommitActiveAndStandbyTasks() {
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   Could you keep the indentation consistent with the rest of the file?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -960,10 +958,8 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks())
             .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(convertedTask1);

Review Comment:
   ```suggestion
                   consumer)).thenReturn(convertedTask1);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(task01Converted);

Review Comment:
   ```suggestion
                   consumer)).thenReturn(task01Converted);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3528,14 +3479,14 @@ public void shouldCommitProvidedTasksIfNeeded() {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andStubReturn(Arrays.asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(Arrays.asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(Arrays.asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3587,12 +3537,12 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw
         makeTaskFolders(taskId00.toString(), task01.toString());
         expectLockObtainedFor(taskId00, taskId01);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```
   You do not need to specify this multiple times. If you specify it once Mockito should take care of continuing to return the same mock on every invocation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2125,12 +2104,12 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2330,7 +2311,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2254,7 +2234,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActive, uncorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActive, uncorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4493,9 +4441,9 @@ public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer())
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.threadProducer())
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3920,9 +3869,10 @@ public void shouldProcessActiveTasks() {
         assignment.put(taskId01, taskId01Partitions);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(Arrays.asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(Arrays.asList(task00, task01));

Review Comment:
   ```suggestion
               .thenReturn(Arrays.asList(task00, task01));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)

Review Comment:
   You do not need to specify this 3 times. If you specify it once Mockito takes care of returning the same result on every invocation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3171,7 +3134,7 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
           Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -539,18 +542,15 @@ public void shouldCreateActiveTaskDuringAssignment() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
-        expect(activeTaskCreator.createTasks(consumer, mkMap(
-            mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions())))
-        ).andReturn(createdTasks);
+        final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
+                mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()));

Review Comment:
   ```suggestion
               mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2476,10 +2458,9 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));

Review Comment:
   ```suggestion
               .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4635,19 +4582,18 @@ public void shouldConvertStandbyTaskToActiveTask() {
         final StreamTask activeTask = mock(StreamTask.class);
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), Mockito.eq(taskId00Partitions), any()))
+                .thenReturn(activeTask);

Review Comment:
   ```suggestion
               .thenReturn(activeTask);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1266,16 +1259,15 @@ public void shouldHandleExceptionThrownDuringClosingTaskProducerInCloseCleanRest
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        expectLastCall().andThrow(new RuntimeException("Something happened"));
-        replay(activeTaskCreator);
+        final TaskId taskId = statefulTask.id();
+        doThrow(new RuntimeException("Something happened"))
+                .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);

Review Comment:
   ```suggestion
               .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3137,13 +3102,11 @@ public Set<TopicPartition> changelogPartitions() {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall();
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall().andThrow(new RuntimeException("whatever"));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
         expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());

Review Comment:
   Isn't this the same as line 3107? If it is, is there a reason we need a copy of it? If not, can we get rid of it?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,

Review Comment:
   ```suggestion
               consumer,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,

Review Comment:
   ```suggestion
               consumer,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,
+                mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))

Review Comment:
   ```suggestion
               mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3428,10 +3380,10 @@ public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
     public void shouldInitializeNewActiveTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions),

Review Comment:
   If none of the things are captors or things which allow you to generalise you don't need to use matchers.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
               .thenReturn(producer);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3876,14 +3825,14 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andStubReturn(asList(task00, task01, task02, task03));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02, task03));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02, task03));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4058,10 +4008,10 @@ public boolean process(final long wallClockTime) {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org