You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/04/04 14:37:46 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

cadonna commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r1157144930


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));

Review Comment:
   nit: adding the indentation is not necessary.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
-            .andStubReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+                .thenReturn(convertedTask1);
+        when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+                .thenReturn(convertedTask0);
+        when(consumer.assignment()).thenReturn(emptySet());
         final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
+        when(tasks.removePendingTaskToCloseClean(taskToUpdateInputPartitions.id())).thenReturn(false);
         when(tasks.removePendingTaskToRecycle(taskToRecycle0.id())).thenReturn(taskId00Partitions);
         when(tasks.removePendingTaskToRecycle(taskToRecycle1.id())).thenReturn(taskId01Partitions);
         when(tasks.removePendingTaskToRecycle(
-            argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))
+                argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))

Review Comment:
   nit: adding indentation not necessary



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
     }
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId00Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId00Partitions).build();
         final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId01Partitions).build();
+                .inState(State.RUNNING)
+                .withInputPartitions(taskId01Partitions).build();
         final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
         final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
         final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId02Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId02Partitions).build();
         final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
-            .inState(State.RESTORING)
-            .withInputPartitions(taskId03Partitions).build();
+                .inState(State.RESTORING)
+                .withInputPartitions(taskId03Partitions).build();
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks())
-            .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+                .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
-            .andStubReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();
+        when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+                .thenReturn(convertedTask1);
+        when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+                .thenReturn(convertedTask0);

Review Comment:
   nit: we usually indent 4 spaces. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1119,17 +1051,15 @@ public void shouldRecycleRestoredTask() {
             .inState(State.CREATED)
             .withInputPartitions(taskId00Partitions).build();
         final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
-        expect(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
-            .andStubReturn(standbyTask);
+        when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
+            .thenReturn(standbyTask);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
           verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block further down.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1193,15 +1119,13 @@ public void shouldCloseCleanRestoredTask() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1060,14 +996,12 @@ public void shouldTransitRestoredTaskToRunning() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
         consumer.resume(task.inputPartitions());

Review Comment:
   This needs to become a verification
   ```java
   verify(consumer).resume(task.inputPartitions());
   ```
   further down in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1292,15 +1210,13 @@ public void shouldUpdateInputPartitionsOfRestoredTask() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         consumer.resume(statefulTask.inputPartitions());

Review Comment:
   This needs to become verification
   
   ```java
   verify(consumer).resume(statefulTask.inputPartitions());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1534,66 +1440,61 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
             taskId10.toString(),
             "dummy"
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        verify(stateDirectory);
         assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
     }
 
     @Test
     public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
-        expect(consumer.assignment()).andReturn(assigned);
-        consumer.pause(assigned);
-        replay(consumer);
+        when(consumer.assignment()).thenReturn(assigned);
 
         taskManager.handleRebalanceComplete();
 
-        verify(consumer);
+        verify(consumer).pause(assigned);
     }
 
     @Test
     public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
         final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
-        expect(consumer.assignment()).andReturn(assigned);
-        consumer.pause(mkSet(t1p1));
-        replay(consumer);
+        when(consumer.assignment()).thenReturn(assigned);
 
         taskManager.handleRebalanceComplete();
 
-        verify(consumer);
+        verify(consumer).pause(mkSet(t1p1));
     }
 
     @Test
     public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
         expectLockObtainedFor(taskId00, taskId01, taskId02);
-        expectUnlockFor(taskId02);
 
         makeTaskFolders(
             taskId00.toString(),  // active task
             taskId01.toString(),  // standby task
             taskId02.toString()   // unassigned but able to lock
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
 
         handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
-        reset(consumer);
-        expectConsumerAssignmentPaused(consumer);
-        replay(consumer);
+
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        when(consumer.assignment()).thenReturn(assignment);
 
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
-        verify(stateDirectory);
+        verify(consumer).pause(assignment);
+        verify(stateDirectory).unlock(taskId02);
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   This method should also take the assignment as an argument. Maybe it is even better to call
   ```java
   verify(consumer, atLeastOnce()).resume(assignment);
   ```
   instead of `verifyConsumerResumedWithAssignment(consumer)` here.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1667,6 +1566,8 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception {
         restoringTask.setChangelogOffsets(changelogOffsets);
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Why did you add this verification? It was not there before the migration, right?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1213,37 +1137,33 @@ public void shouldHandleExceptionThrownDuringCloseInCloseCleanRestoredTask() {
         final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
         doThrow(RuntimeException.class).when(statefulTask).closeClean();
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1268,16 +1188,14 @@ public void shouldCloseDirtyRestoredTask() {
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
         activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());

Review Comment:
   This needs to become verification
   
   ```java
   verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
   ```
   
   in the verification block.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4370,27 +4080,26 @@ private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<Top
 
     private void expectLockObtainedFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(true).once();
+            when(stateDirectory.lock(task)).thenReturn(true);
         }
     }
 
     private void expectLockFailedFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(false).once();
+            when(stateDirectory.lock(task)).thenReturn(false);
         }
     }
 
     private void expectUnlockFor(final TaskId... tasks) throws Exception {
         for (final TaskId task : tasks) {
-            stateDirectory.unlock(task);
-            expectLastCall();
+            doNothing().when(stateDirectory).unlock(task);
         }
     }
 
     private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.pause(assignment);
+        when(consumer.assignment()).thenReturn(assignment);
+        doNothing().when(consumer).pause(assignment);
     }

Review Comment:
   This method is not needed anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org