You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/06/19 10:37:47 UTC

[GitHub] [kafka] clolov opened a new pull request, #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

clolov opened a new pull request, #13873:
URL: https://github.com/apache/kafka/pull/13873

   This pull requests migrates the Consumer mock in TaskManagerTest from EasyMock to Mockito.
   The change is restricted to a single mock to minimize the scope and make it easier for review.
   
   The reasoning as to why we would like to migrate a single mock rather than all mocks in the file at the same time has been discussed in https://github.com/apache/kafka/pull/12607#issuecomment-1500829973
   
   It takes the same approach as in:
   https://github.com/apache/kafka/pull/13529
   https://github.com/apache/kafka/pull/13621
   https://github.com/apache/kafka/pull/13681
   https://github.com/apache/kafka/pull/13711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this.~~
   Silly me x2, the thing is already a set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1239994017


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();

Review Comment:
   Ah, interesting, okay, I will try changing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1241990023


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }

Review Comment:
   I would inline this function since it became an one-liner.
   
   ```java
   when(consumer.assignment()).thenReturn(singleton(new TopicPartition("assignment", 0)));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2034,12 +1993,9 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         // `handleAssignment`
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
             .thenReturn(asList(corruptedTask, nonCorruptedTask));
-        expectRestoreToBeCompleted(consumer);
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        // check that we should not commit empty map either
-        consumer.commitSync(eq(emptyMap()));
-        expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty"));
-        replay(consumer);
+        expectAssignmentToBeCalled(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);

Review Comment:
   Same here



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2449,17 +2398,15 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTask.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+        verifyResumeWasCalledWith(consumer, partitions);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2377,6 +2326,7 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+        verifyResumeWasCalledWith(consumer, partition);

Review Comment:
   Why do you verify? It was not verified in the original code.
   
   I added a couple of this comments below. However, I am not sure whether the original author did not want to verify the consumer or if they forgot about it. I will leave it to you if you want to keep them or not.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2805,6 +2716,7 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
 
         assertThat(task00.commitNeeded, is(true));
         assertThat(task10.commitPrepared, is(false));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2834,14 +2744,14 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
         taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
 
         assertThat(task00.commitNeeded, is(true));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   Yeah, that seems weird...



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+    }
+
+    private static void verifyResumeWasCalledWith(final Consumer<byte[], byte[]> consumer, Set<TopicPartition> assignment) {

Review Comment:
   I would inline this function.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1998,10 +1957,10 @@ public void suspend() {
             }
         };
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Same as above also here



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2541,6 +2482,7 @@ public void shouldAddNewActiveTasks() {
         assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(consumer).resume(emptySet());

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Why do you union a single set? 
   ```suggestion
           when(consumer.assignment()).thenReturn(taskId00Partitions);
   ```
   ... and isn't this a duplicate of line 1923 (except the differing partitions).
   ```java
   expectAssignmentToBeCalled(consumer);
   ```
   Could it be that you can remove line 1923?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2494,17 +2440,16 @@ public void shouldAddNonResumedSuspendedTasks() {
         assertThat(task01.state(), is(Task.State.RUNNING));
 
         Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
             mkEntry(taskId02, taskId02Partitions)
             );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
 
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
         doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
 
-        expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-        replay(consumer, stateManager);

Review Comment:
   Did you miss to replay the state manager?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();
-
-        replay(consumer);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
+        doNothing().when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);

Review Comment:
   I think something strange happened here. Not your fault. The producer is used like an EasyMock mock but it is specified as a Mockito mock. The producer does not do anything in the test as far as I can see.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4034,14 +3913,13 @@ public Set<TopicPartition> changelogPartitions() {
         };
 
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        replay(consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
         assertThat(task00.state(), is(Task.State.RESTORING));
         // this could be a bit mysterious; we're verifying _no_ interactions on the consumer,
         // since the taskManager should _not_ resume the assignment while we're still in RESTORING
-        verify(consumer);
+        Mockito.verifyNoMoreInteractions(consumer);

Review Comment:
   ```suggestion
           Mockito.verifyNoInteractions(consumer);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2775,6 +2686,8 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() {
         assertThat(task01.commitPrepared, is(true));
         assertThat(task02.commitPrepared, is(false));
         assertThat(task10.commitPrepared, is(false));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2629,19 +2558,16 @@ public void shouldSuspendActiveTasksDuringRevocation() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        consumer.commitSync(offsets);
-        expectLastCall();
-
-        replay(consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         taskManager.handleRevocation(taskId00Partitions);
         assertThat(task00.state(), is(Task.State.SUSPENDED));
+        verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);

Review Comment:
   I would inline this function.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet 😞 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito [kafka]

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13873:
URL: https://github.com/apache/kafka/pull/13873#issuecomment-1770389966

   I will provide an update on this pull request in order to not close it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1239992529


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);

Review Comment:
   Oh, I see, I wasn't aware of this - I thought it was a leftover from previous expectations which were never deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1239990255


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   I confirm that this is indeed the reason I removed the expectation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1234973245


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
-        consumer.resume(task.inputPartitions());
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
         Mockito.verify(tasks).addTask(task);
-        verify(consumer);
+        Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   could you please add `verifyNoMoreInteraction`for consumer mock here. Asking because looks like there should be more than one invocation of consumer.resume() in this test but we are only testing for one.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+    }
+
+    private static void verifyResumeWasCalledWith(final Consumer<byte[], byte[]> consumer, Set<TopicPartition> assignment) {

Review Comment:
   s/verifyResumeWasCalledWith /verifyResumeWasCalledWithAssignment



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);

Review Comment:
   why are we checking for atLeastOnce and not the exact times? Isn't this relaxing constrains from what we were doing with easy mock?
   
   (same for verifyResumeWasCalledWith)



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   was this an unnecessary stub? asking because I didn't see mockito stub for consumer.assignment().



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
         // `handleAssignment`
         when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-        // `tryToCompleteRestoration`
-        expect(consumer.assignment()).andReturn(emptySet());
-        consumer.resume(eq(emptySet()));
-        expectLastCall();
-
-        // `shutdown`
-        consumer.commitSync(Collections.emptyMap());

Review Comment:
   verify



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   verify



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        consumer.commitSync(offsets);

Review Comment:
   verify this?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() {
             mkEntry(taskId05, taskId05Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
             .thenReturn(Arrays.asList(task00, task01, task02));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(Arrays.asList(task03, task04, task05));
 
-        consumer.commitSync(eq(emptyMap()));

Review Comment:
   verify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13873:
URL: https://github.com/apache/kafka/pull/13873#issuecomment-1752300357

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242151305


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   You are correct, it appears I can remove line 1923



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242226466


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();
-
-        replay(consumer);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
+        doNothing().when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);

Review Comment:
   Alright, I removed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1250518557


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   I understand what you mean regarding the verification. However, the original author did not call `verify()` on the consumer. Therefore, I was wondering why you added a verification. As I commented [here](https://github.com/apache/kafka/pull/13873#discussion_r1242098191) I am not sure if the original author just missed to add the `verify()`. So I am fine with you adding the verification, I just was not sure if you had a reason that I did not see.
   
   Regarding inlining the verification, yes, I would be in favor of inlining. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1239995959


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);

Review Comment:
   There are a couple of tests where consumer.resume() is called twice. I will amend this to be stricter if possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242131444


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() {
             mkEntry(taskId05, taskId05Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
             .thenReturn(Arrays.asList(task00, task01, task02));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(Arrays.asList(task03, task04, task05));
 
-        consumer.commitSync(eq(emptyMap()));

Review Comment:
   This is just not called, that's why I removed it originally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1250520050


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4387,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) {
+        final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+    }
+
+    private static void verifyResumeWasCalledWith(final Consumer<byte[], byte[]> consumer, Set<TopicPartition> assignment) {

Review Comment:
   There is a checkstyle error in the builds because you missed the `final` on the second parameter here.
   ```suggestion
       private static void verifyResumeWasCalledWith(final Consumer<byte[], byte[]> consumer, final Set<TopicPartition> assignment) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242144879


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
             mkEntry(taskId02, taskId02Partitions)
             );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
 
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
         doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
 
-        expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-        replay(consumer, stateManager);

Review Comment:
   😱 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
             mkEntry(taskId02, taskId02Partitions)
             );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
 
         when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
+        when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
         doThrow(new TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
 
-        expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-        replay(consumer, stateManager);

Review Comment:
   I will re-add it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242140474


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   I will answer here, but the answer applies to all the other places you have asked it as well.
   The original code for `expectRestoreToBeCompleted` was:
   ```
   private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
           final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
           expect(consumer.assignment()).andReturn(assignment);
           consumer.resume(assignment);
           expectLastCall();
   }
   ```
   This should in theory be changed to
   ```
   private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) {
           final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
           when(consumer.assignment()).thenReturn(assignment);
           Mockito.verify(consumer).resume(assignment);
   }
   ```
   However, if we have the verify there it will fail as it needs to be verified after the object/method under test is exercised. Hence I moved it to a separate function and added it as a verification at the end of the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
         // `handleAssignment`
         when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-        // `tryToCompleteRestoration`
-        expect(consumer.assignment()).andReturn(emptySet());
-        consumer.resume(eq(emptySet()));
-        expectLastCall();
-
-        // `shutdown`
-        consumer.commitSync(Collections.emptyMap());

Review Comment:
   I don't think there is a need for verification here. The consumer was only replayed in EasyMock and the replayed behaviour comes by default in Mockito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242132233


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        consumer.commitSync(offsets);

Review Comment:
   This is not called, this is why I removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1236586513


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);

Review Comment:
   Here you need to add `Mockito.verifyNoInteractions(consumer)` because that was the intent of replaying a consumer without expectations and verifying it.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(tasks).addTask(statefulTask);
     }

Review Comment:
   nit (and probably my fault 🙂) 
   ```suggestion
       }
   
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1066,14 +1054,12 @@ public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
         final TimeoutException timeoutException = new TimeoutException();
         doThrow(timeoutException).when(task).completeRestoration(noOpResetter);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
         Mockito.verify(tasks, never()).addTask(task);
         Mockito.verify(task, never()).clearTaskTimeout();
-        verify(consumer);

Review Comment:
   Also here, please add `Mockito.verifyNoInteractions(consumer)`.
   
   You also miss this verification in other places.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
-        consumer.resume(task.inputPartitions());
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
         Mockito.verify(tasks).addTask(task);
-        verify(consumer);
+        Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   I think, I cannot completely follow your reasoning here.  Why would you add `verifyNoMoreInteractions`? The important thing here is that the consumer resumes polling from the input partitions. However, I also see that with easymock this test verifies that `consumer.resume(task.inputPartitions())` is the only method called on the consumer mock. I am fine either way.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();

Review Comment:
   It seems I was a bit sloppy here. When a task is removed from the state updater, there should be no interactions with the consumer. Please remove the expecations and verify for no interactions with the consumer mock.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   Mockito returns an empty collection by default. @clolov Could you confirm if this was the reason you removed the stub?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1239994806


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   I actually do not know why these were being verified with EasyMock given that taskXY are not mocks.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   I actually do not know why these were being verified with EasyMock given that taskXY are not mocks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
         // `handleAssignment`
         when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-        // `tryToCompleteRestoration`
-        expect(consumer.assignment()).andReturn(emptySet());
-        consumer.resume(eq(emptySet()));
-        expectLastCall();
-
-        // `shutdown`
-        consumer.commitSync(Collections.emptyMap());

Review Comment:
   ~~I don't think there is a need for verification here. The consumer was only replayed in EasyMock and the replayed behaviour comes by default in Mockito~~
   This is just not called, that's why I removed it originally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242142731


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   On the same topic, yes, I could inline the setup and if you are okay with it I will then inline the verification as well. Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet 😞 ~~



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Because I couldn't figure out a better way to create a set with only one element and Collections does not have a singletonSet~~ 😞



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) {
         };
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expect(consumer.assignment()).andReturn(taskId00Partitions);
-        replay(consumer);
+        final Set<TopicPartition> partitions = union(HashSet::new, taskId00Partitions);
+        when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1258199104


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2469,19 +2416,18 @@ public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        verifyResumeWasCalled(consumer);

Review Comment:
   Okay, thank you, I will aim to get this changed in the next few days!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org