You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/06/21 09:27:24 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1236586513


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);

Review Comment:
   Here you need to add `Mockito.verifyNoInteractions(consumer)` because that was the intent of replaying a consumer without expectations and verifying it.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(tasks).addTask(statefulTask);
     }

Review Comment:
   nit (and probably my fault 🙂) 
   ```suggestion
       }
   
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1066,14 +1054,12 @@ public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
         final TimeoutException timeoutException = new TimeoutException();
         doThrow(timeoutException).when(task).completeRestoration(noOpResetter);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
         Mockito.verify(tasks, never()).addTask(task);
         Mockito.verify(task, never()).clearTaskTimeout();
-        verify(consumer);

Review Comment:
   Also here, please add `Mockito.verifyNoInteractions(consumer)`.
   
   You also miss this verification in other places.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
-        consumer.resume(task.inputPartitions());
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
         Mockito.verify(tasks).addTask(task);
-        verify(consumer);
+        Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   I think, I cannot completely follow your reasoning here.  Why would you add `verifyNoMoreInteractions`? The important thing here is that the consumer resumes polling from the input partitions. However, I also see that with easymock this test verifies that `consumer.resume(task.inputPartitions())` is the only method called on the consumer mock. I am fine either way.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();

Review Comment:
   It seems I was a bit sloppy here. When a task is removed from the state updater, there should be no interactions with the consumer. Please remove the expecations and verify for no interactions with the consumer mock.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   Mockito returns an empty collection by default. @clolov Could you confirm if this was the reason you removed the stub?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org