You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/08 13:52:36 UTC

[GitHub] [kafka] clolov opened a new pull request, #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

clolov opened a new pull request, #12607:
URL: https://github.com/apache/kafka/pull/12607

   Batch 5 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r1157144930


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));

Review Comment:
   nit: adding the indentation is not necessary.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
-            .andStubReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+                .thenReturn(convertedTask1);
+        when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+                .thenReturn(convertedTask0);
+        when(consumer.assignment()).thenReturn(emptySet());
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
+        when(tasks.removePendingTaskToCloseClean(taskToUpdateInputPartitions.id())).thenReturn(false);
         when(tasks.removePendingTaskToRecycle(taskToRecycle0.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(taskToRecycle1.id())).thenReturn(taskId01Partitions);
         when(tasks.removePendingTaskToRecycle(
-            argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))
+                argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))

Review Comment:
   nit: adding indentation not necessary



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
-            .andStubReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+                .thenReturn(convertedTask1);
+        when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+                .thenReturn(convertedTask0);

Review Comment:
   nit: we usually indent 4 spaces. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1119,17 +1051,15 @@ public void shouldRecycleRestoredTask() {
             .inState(State.CREATED)
             .withInputPartitions(taskId00Partitions).build();
         final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
-            .andStubReturn(standbyTask);
+        when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
+            .thenReturn(standbyTask);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
           verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block further down.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1193,15 +1119,13 @@ public void shouldCloseCleanRestoredTask() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1060,14 +996,12 @@ public void shouldTransitRestoredTaskToRunning() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
         consumer.resume(task.inputPartitions());

Review Comment:
   This needs to become a verification
   ```java
   verify(consumer).resume(task.inputPartitions());
   ```
   further down in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1292,15 +1210,13 @@ public void shouldUpdateInputPartitionsOfRestoredTask() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         consumer.resume(statefulTask.inputPartitions());

Review Comment:
   This needs to become verification
   
   ```java
   verify(consumer).resume(statefulTask.inputPartitions());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1534,66 +1440,61 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
             taskId10.toString(),
             "dummy"
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        verify(stateDirectory);
         assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
     }
 
     @Test
     public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
-        expect(consumer.assignment()).andReturn(assigned);
-        consumer.pause(assigned);
-        replay(consumer);
+        when(consumer.assignment()).thenReturn(assigned);
 
         taskManager.handleRebalanceComplete();
 
-        verify(consumer);
+        verify(consumer).pause(assigned);
     }
 
     @Test
     public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
         final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
-        expect(consumer.assignment()).andReturn(assigned);
-        consumer.pause(mkSet(t1p1));
-        replay(consumer);
+        when(consumer.assignment()).thenReturn(assigned);
 
         taskManager.handleRebalanceComplete();
 
-        verify(consumer);
+        verify(consumer).pause(mkSet(t1p1));
     }
 
     @Test
     public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
         expectLockObtainedFor(taskId00, taskId01, taskId02);
-        expectUnlockFor(taskId02);
 
         makeTaskFolders(
             taskId00.toString(),  // active task
             taskId01.toString(),  // standby task
             taskId02.toString()   // unassigned but able to lock
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
 
         handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
-        reset(consumer);
-        expectConsumerAssignmentPaused(consumer);
-        replay(consumer);
+
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        when(consumer.assignment()).thenReturn(assignment);
 
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
-        verify(stateDirectory);
+        verify(consumer).pause(assignment);
+        verify(stateDirectory).unlock(taskId02);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   This method should also take the assignment as an argument. Maybe it is even better to call
   ```java
   verify(consumer, atLeastOnce()).resume(assignment);
   ```
   instead of `verifyConsumerResumedWithAssignment(consumer)` here.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1667,6 +1566,8 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception {
         restoringTask.setChangelogOffsets(changelogOffsets);
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Why did you add this verification? It was not there before the migration, right?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1213,37 +1137,33 @@ public void shouldHandleExceptionThrownDuringCloseInCloseCleanRestoredTask() {
         final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
         doThrow(RuntimeException.class).when(statefulTask).closeClean();
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1268,16 +1188,14 @@ public void shouldCloseDirtyRestoredTask() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4370,27 +4080,26 @@ private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<Top
 
     private void expectLockObtainedFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(true).once();
+            when(stateDirectory.lock(task)).thenReturn(true);
         }
     }
 
     private void expectLockFailedFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(false).once();
+            when(stateDirectory.lock(task)).thenReturn(false);
         }
     }
 
     private void expectUnlockFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            stateDirectory.unlock(task);
-            expectLastCall();
+            doNothing().when(stateDirectory).unlock(task);
         }
     }
 
     private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.pause(assignment);
+        when(consumer.assignment()).thenReturn(assignment);
+        doNothing().when(consumer).pause(assignment);
     }

Review Comment:
   This method is not needed anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968206590


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() {
         topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 
         assertEquals(taskManager.notPausedTasks().size(), 0);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   The below function set an expectation for a function which was not stubbed:
   ```
   private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
                                                      final ChangelogReader changeLogReader,
                                                      final boolean changeLogUpdateRequired) {
           final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
           expect(consumer.assignment()).andReturn(assignment);
           consumer.resume(assignment); <--- THIS IS THE EXPECTATION
           expectLastCall();
           expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).times(changeLogUpdateRequired ? 1 : 0, 1);
   }
   ```
   I wanted to keep this verification so I moved it to a separate function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968209811


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4068,18 +3796,14 @@ public void shouldConvertStandbyTaskToActiveTask() {
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-
-        replay(standbyTaskCreator, activeTaskCreator, consumer);
+        when(activeTaskCreator.createTasks(any(), eq(Collections.emptyMap()))).thenReturn(Collections.emptySet());

Review Comment:
   If we use `@RunWith(MockitoJUnitRunner.StrictStubs.class)` and we run the whole test class Mockito verifies that all stubbings have indeed been used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1507025872

   > What worries you about the testing strategy in the class?
   
   My worry is that we verify interactions with mocks (mainly strict mocks) in unit tests in which we test unrelated aspects. That makes the test code more complicated than necessary. One example, is [this](https://github.com/apache/kafka/pull/13529#discussion_r1165561113) from the other PR I linked above. In other words, we should probably try not only to migrate to Mockito but also to refactor a bit the unit tests where it makes sense to decrease the verifications on the mocks and thus the complexity of the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov closed pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
URL: https://github.com/apache/kafka/pull/12607


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r972413688


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() {
         topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 
         assertEquals(taskManager.notPausedTasks().size(), 0);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Ah I see, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968207227


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -271,11 +265,13 @@ public void shouldClassifyExistingTasksWithStateUpdater() {
 
         taskManager.handleAssignment(standbyTasks, restoringActiveTasks);
 
-        Mockito.verify(stateUpdater).getTasks();
-        Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
-        Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
-        Mockito.verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
-        Mockito.verify(stateUpdater).remove(restoringActiveTaskToClose.id());
+        verify(stateUpdater).getTasks();
+        verify(stateUpdater).remove(standbyTaskToRecycle.id());
+        verify(stateUpdater).remove(standbyTaskToClose.id());
+        verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
+        verify(stateUpdater).remove(restoringActiveTaskToClose.id());
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240754283

   There were stubbings which were no longer on the call path so I have removed them. The way I checked that I wasn't changing the test behaviour was to use EasyMock.verify on the mocks and confirming that the stubbings were indeed unused prior to my change. There are multiple possibilities for refactoring, but I chose to keep the changes as close to the EasyMock implementation as the PR is already big.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240757288

   @guozhangwang and @cadonna for visibility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1241764996

   Hey @guozhangwang and thank you for the insightful comments! I will aim to give explanations and amend the PR in the upcoming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968200658


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
 
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
-            .andStubReturn(task00Converted);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .thenReturn(task00Converted);
+        when(consumer.assignment()).thenReturn(emptySet());
 
         taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   By default it expect the method call to happen only once:
   ```
   public static <T> T verify​(T mock)
   
   Verifies certain behavior happened once.
   ```
   https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#verify(T)



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
 
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
-            .andStubReturn(task00Converted);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .thenReturn(task00Converted);
+        when(consumer.assignment()).thenReturn(emptySet());
 
         taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   By default it expects the method call to happen only once:
   ```
   public static <T> T verify​(T mock)
   
   Verifies certain behavior happened once.
   ```
   https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#verify(T)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968204971


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1300,28 +1229,16 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
-        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(standbyTaskCreator.createTasks(eq(taskId01Assignment))).thenReturn(singletonList(task01));
 
+        // The second attempt will return empty tasks.
         makeTaskFolders(taskId00.toString(), taskId01.toString());
         expectLockObtainedFor(taskId00, taskId01);
 
-        // The second attempt will return empty tasks.
-        makeTaskFolders();

Review Comment:
   I have altered the makeTaskFolders function to take the second call into account.
   
   Before:
   ```
   expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
   ```
   After:
   ```
   when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders).thenReturn(Collections.emptyList());
   ```
   
   As far as I understand subsequent calls for stubbing in Mockito overwrite previous ones, so I cannot do exactly what was done with EasyMock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r972414206


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4068,18 +3796,14 @@ public void shouldConvertStandbyTaskToActiveTask() {
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-
-        replay(standbyTaskCreator, activeTaskCreator, consumer);
+        when(activeTaskCreator.createTasks(any(), eq(Collections.emptyMap()))).thenReturn(Collections.emptySet());

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1513271209

   Okay, this makes sense to me. I will aim to start opening PRs in the same manner as yours in the upcoming days and let's see where we go!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r966289656


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1300,28 +1229,16 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
 
         // `handleAssignment`
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
-        topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(standbyTaskCreator.createTasks(eq(taskId01Assignment))).thenReturn(singletonList(task01));
 
+        // The second attempt will return empty tasks.
         makeTaskFolders(taskId00.toString(), taskId01.toString());
         expectLockObtainedFor(taskId00, taskId01);
 
-        // The second attempt will return empty tasks.
-        makeTaskFolders();

Review Comment:
   Why we can remove those calls?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
 
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
-            .andStubReturn(task00Converted);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .thenReturn(task00Converted);
+        when(consumer.assignment()).thenReturn(emptySet());
 
         taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   How to specify that we expect this function call only once? Should we use `verify(activeTaskCreator, times(1)).func();` instead?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() {
         topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 
         assertEquals(taskManager.notPausedTasks().size(), 0);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Why add this additional verification?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -271,11 +265,13 @@ public void shouldClassifyExistingTasksWithStateUpdater() {
 
         taskManager.handleAssignment(standbyTasks, restoringActiveTasks);
 
-        Mockito.verify(stateUpdater).getTasks();
-        Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
-        Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
-        Mockito.verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
-        Mockito.verify(stateUpdater).remove(restoringActiveTaskToClose.id());
+        verify(stateUpdater).getTasks();
+        verify(stateUpdater).remove(standbyTaskToRecycle.id());
+        verify(stateUpdater).remove(standbyTaskToClose.id());
+        verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
+        verify(stateUpdater).remove(restoringActiveTaskToClose.id());
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   There are several tests where we are adding this verification, could you elaborate a bit why?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4068,18 +3796,14 @@ public void shouldConvertStandbyTaskToActiveTask() {
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-        expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
-
-        replay(standbyTaskCreator, activeTaskCreator, consumer);
+        when(activeTaskCreator.createTasks(any(), eq(Collections.emptyMap()))).thenReturn(Collections.emptySet());

Review Comment:
   Here the goals are both verifying those functions should be called, and also return the results when they are called (you can see we did not use `andStubReturn` here). When replacing them with `when` we would not verify anymore right? Ditto elsewhere.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
 
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
-            .andStubReturn(task00Converted);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .thenReturn(task00Converted);
+        when(consumer.assignment()).thenReturn(emptySet());
 
         taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   Ditto elsewhere for replacing `expectLastCall().once()`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2180,29 +2035,16 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
         );
         expectRestoreToBeCompleted(consumer, changeLogReader);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02));
 
-        expect(activeTaskCreator.threadProducer()).andReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
-            .andReturn(singletonList(task10));
+        when(standbyTaskCreator.createTasks(eq(assignmentStandby)))
+            .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();

Review Comment:
   Why we can remove those verifications now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240756214

   I am aware that there are merge conflicts and I will aim to address them over the coming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r972412952


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
 
         taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
-            .andStubReturn(task00Converted);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
-        replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+            .thenReturn(task01Converted);
+        when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+            .thenReturn(task00Converted);
+        when(consumer.assignment()).thenReturn(emptySet());
 
         taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
         taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
         taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   Ack, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1326276846

   Polite bump for a review here @cadonna. I have rebased on the latest trunk and I believe I have answered the majority of the comments already on the pull request. Given the size of the change I would prefer to get rid of StateMachineTask in a seprate pull request rather than as part of this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r972413951


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2180,29 +2035,16 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
         );
         expectRestoreToBeCompleted(consumer, changeLogReader);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02));
 
-        expect(activeTaskCreator.threadProducer()).andReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
-            .andReturn(singletonList(task10));
+        when(standbyTaskCreator.createTasks(eq(assignmentStandby)))
+            .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();

Review Comment:
   Sounds great, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1504960483

   Heya @cadonna, thank you for the review! The approach you have taken in the other PR makes sense to me, so I will aim to open a few pull requests in a similar fashion. What worries you about the testing strategy in the class?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1609466842

   I will close this PR as we have taken a different approach. Please refer to PRs linking to this one in order to find out how we have implemented the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968208668


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2180,29 +2035,16 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
         );
         expectRestoreToBeCompleted(consumer, changeLogReader);
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
+            .thenReturn(asList(task00, task01, task02));
 
-        expect(activeTaskCreator.threadProducer()).andReturn(producer);
+        when(activeTaskCreator.threadProducer()).thenReturn(producer);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expect(standbyTaskCreator.createTasks(eq(assignmentStandby)))
-            .andReturn(singletonList(task10));
+        when(standbyTaskCreator.createTasks(eq(assignmentStandby)))
+            .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();

Review Comment:
   As far as I am aware Mockito cannot verify interactions with things which are not mocks. This being said, given you suggested I remove the StateMachineTask this might change in subsequent commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r968206590


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() {
         topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 
         assertEquals(taskManager.notPausedTasks().size(), 0);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   The below method set an expectation for a method call which was not stubbed:
   ```
   private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer,
                                                      final ChangelogReader changeLogReader,
                                                      final boolean changeLogUpdateRequired) {
           final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
           expect(consumer.assignment()).andReturn(assignment);
           consumer.resume(assignment); <--- THIS IS THE EXPECTATION
           expectLastCall();
           expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).times(changeLogUpdateRequired ? 1 : 0, 1);
   }
   ```
   I wanted to keep this verification so I moved it to a separate function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1413735823

   @cadonna I have rebased this on the latest trunk. Is it possible to get a review? This pull requests is one of the last remaining ones for completing the Mockito migration for streams.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1500829973

   @clolov Since this PR is rather long and hard to review, I took a stab to subdivide it into smaller PRs by migrating single mocks. I opened a PR for the migration of the topology builder mock: https://github.com/apache/kafka/pull/13529.
   Let me know what you think about it. 
   If you think that is a good idea feel free to migrate the other mocks. I think we also need a bit to rethink the testing strategy in this test class. Sorry that this migration gets so complicated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org