You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "clolov (via GitHub)" <gi...@apache.org> on 2023/05/12 11:11:03 UTC

[GitHub] [kafka] clolov opened a new pull request, #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

clolov opened a new pull request, #13711:
URL: https://github.com/apache/kafka/pull/13711

   Similar to:
   https://github.com/apache/kafka/pull/13529
   https://github.com/apache/kafka/pull/13621
   https://github.com/apache/kafka/pull/13681
   
   **This needs to be rebased on top of https://github.com/apache/kafka/pull/13681 before merging**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1588950243

   Okay, the name has been changed and this has been rebased. I ran checkstyleTest and spotbugsTest and they are passing locally. If everything passes in the automated tests you should be able to merge it at your leisure!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13711:
URL: https://github.com/apache/kafka/pull/13711#discussion_r1218169011


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1740,13 +1740,14 @@ public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throw
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(activeTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         assertThat(uninitializedTask.state(), is(State.CREATED));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   To be honest, here I followed what was done in PR https://github.com/apache/kafka/pull/13681
   For example, the original in the mentioned PR was.
   ```
   ...
   expect(activeTaskCreator.createTasks(consumer, Collections.emptyMap())).andReturn(emptySet());
   ...
   ```
   (https://github.com/apache/kafka/pull/13681/files#diff-48bc1476f0437fd711093c7c80ce73eda10be0511705799b4248545c203d521dL294)
   
   
   and with Mockito we changed it to
   ```
   ...
   Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
   ...
   ```
   (https://github.com/apache/kafka/pull/13681/files#diff-48bc1476f0437fd711093c7c80ce73eda10be0511705799b4248545c203d521dR306)
   
   I followed the same logic for what I thought was a stronger verification, but I am happy to remove it if you think it is unnecessary. I realise the reasoning for doing the above in that PR might have made more sense because there was more than one argument.
   
   Let me know!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1576391236

   Could you also please add some more info to the PR description instead of only referencing other PRs. One or two sentences followed by the references is totally fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1587103195

   The issue with `shouldInitializeNewStandbyTasks()` seems to be a missed cleanup due to a bug in EasyMock. If I rename the method to `shouldInitialiseNewStandbyTasks()` (notice the s instead of the z) it works for me locally. Probably, the order in which the test methods are executed matters. 
   I know, this sounds ridiculous and actually it is ridiculous. :neutral_face:
   So rename the method. As soon as we will have migrated the whole test class the issue should be gone anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13711:
URL: https://github.com/apache/kafka/pull/13711#discussion_r1217778982


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1854,6 +1855,8 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+
+        Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());

Review Comment:
   Also here no verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1889,6 +1891,8 @@ public void closeClean() {
             is("Encounter unexpected fatal error for task 0_0")
         );
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2232,6 +2238,8 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
 
         assertThat(uncorruptedActive.state(), is(State.RUNNING));
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1766,15 +1767,16 @@ public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Excep
 
         taskManager.handleRebalanceStart(singleton("topic"));
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(closedTask));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(activeTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         closedTask.suspend();
         closedTask.closeClean();
         assertThat(closedTask.state(), is(State.CLOSED));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   Also here, I do not think you need to verify.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2507,19 +2517,20 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTask.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2581,6 +2593,7 @@ public void shouldUpdateInputPartitionsAfterRebalance() {
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertEquals(newPartitionsSet, task00.inputPartitions());
         verify(activeTaskCreator, consumer);
+        Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1988,6 +1991,8 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
         );
         assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1740,13 +1740,14 @@ public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throw
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(activeTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         assertThat(uninitializedTask.state(), is(State.CREATED));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   I do not think you need this verification since also with EasyMock the call is not verified (i.e., `verify()` on the mock is never called).



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2932,6 +2947,8 @@ public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
 
         taskManager.handleAssignment(emptyMap(), emptyMap());
         assertThat(task00.state(), is(Task.State.CLOSED));
+
+        Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2880,6 +2893,8 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
 
         assertThat(task00.commitNeeded, is(true));
         assertThat(task10.commitPrepared, is(false));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2067,6 +2072,8 @@ public void suspend() {
 
         verify(stateManager);
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2956,6 +2972,8 @@ public void suspend() {
         assertThat(task00.state(), is(Task.State.SUSPENDED));
 
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2108,6 +2113,8 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
 
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2031,6 +2035,8 @@ public void postCommit(final boolean enforceCheckpoint) {
 
         verify(stateManager);
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3122,6 +3140,8 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
         verify(activeTaskCreator);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3307,6 +3327,8 @@ public void suspend() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
         verify(activeTaskCreator);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2144,6 +2149,8 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() {
         verify(activeTaskCreator);
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3172,6 +3191,8 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
         verify(activeTaskCreator);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3716,6 +3738,8 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
         final RuntimeException thrown =
             assertThrows(RuntimeException.class, () -> taskManager.commitAll());
         assertThat(thrown.getMessage(), equalTo("opsh."));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2295,6 +2302,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(corruptedActive.state(), is(Task.State.CREATED));
         assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   See above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2379,6 +2387,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
         assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3441,6 +3462,8 @@ public void shouldInitializeNewActiveTasks() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // verifies that we actually resume the assignment at the end of restoration.
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3818,6 +3842,8 @@ public Map<TopicPartition, Long> purgeableOffsets() {
         taskManager.maybePurgeCommittedRecords();
 
         verify(adminClient);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2433,6 +2442,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3958,6 +3983,8 @@ public void shouldProcessActiveTasks() {
         // check that if there's no records proccssible, we would stop early
         assertThat(taskManager.process(3, time), is(5));
         assertThat(taskManager.process(3, time), is(0));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2556,6 +2567,8 @@ public void shouldAddNonResumedSuspendedTasks() {
         assertThat(task01.state(), is(Task.State.RUNNING));
 
         verify(activeTaskCreator);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2606,6 +2618,7 @@ public void shouldAddNewActiveTasks() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2652,6 +2664,7 @@ public void initializeIfNeeded() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4075,6 +4102,8 @@ public boolean process(final long wallClockTime) {
         assertThat(exception.taskId().isPresent(), is(true));
         assertThat(exception.taskId().get(), is(taskId00));
         assertThat(exception.getCause().getMessage(), is("oops"));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4046,6 +4072,8 @@ public boolean process(final long wallClockTime) {
         task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L)));
 
         assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3780,6 +3803,8 @@ public Map<TopicPartition, Long> purgeableOffsets() {
         taskManager.maybePurgeCommittedRecords();
 
         verify(adminClient);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4110,15 +4140,16 @@ public boolean maybePunctuateStreamTime() {
 
         expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(activeTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
 
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         assertThrows(KafkaException.class, () -> taskManager.punctuate());
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2911,18 +2925,19 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
         taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
 
         assertThat(task00.commitNeeded, is(true));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4161,15 +4192,16 @@ public Set<TopicPartition> changelogPartitions() {
         };
 
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(activeTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
         assertThat(task00.state(), is(Task.State.RESTORING));
         // this could be a bit mysterious; we're verifying _no_ interactions on the consumer,
         // since the taskManager should _not_ resume the assignment while we're still in RESTORING
         verify(consumer);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2690,6 +2702,7 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         verify(activeTaskCreator);
         Mockito.verify(changeLogReader).enforceRestoreActive();
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4149,6 +4178,8 @@ public boolean maybePunctuateSystemTime() {
 
         // one for stream and one for system time
         assertThat(taskManager.punctuate(), equalTo(2));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4204,6 +4235,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
                     "tasks have been cleaned up by the handleAssignment callback.")
             );
         }
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3070,6 +3087,8 @@ public void closeDirty() {
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
         verify(activeTaskCreator);
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2700,18 +2713,19 @@ public void shouldSuspendActiveTasksDuringRevocation() {
 
         expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
         consumer.commitSync(offsets);
         expectLastCall();
 
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(activeTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         taskManager.handleRevocation(taskId00Partitions);
         assertThat(task00.state(), is(Task.State.SUSPENDED));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4088,15 +4117,16 @@ public boolean maybePunctuateStreamTime() {
 
         expectRestoreToBeCompleted(consumer);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator, consumer);
+        replay(activeTaskCreator, consumer);
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
 
         assertThat(task00.state(), is(Task.State.RUNNING));
 
         assertThrows(TaskMigratedException.class, () -> taskManager.punctuate());
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4595,6 +4627,8 @@ public void suspend() {
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
         assertThat(task00.state(), is(Task.State.SUSPENDED));
         assertThat(task01.state(), is(Task.State.SUSPENDED));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   No verification needed.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -352,16 +356,17 @@ public void shouldRemoveUnusedStandbyTaskFromStateUpdater() {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
+        final Set<Task> standbyTasks = mkSet(standbyTaskToClose);
+        when(stateUpdater.getTasks()).thenReturn(standbyTasks);

Review Comment:
   nit: Why did you introduce variable `standbyTasks`? The variable is not used anywhere else. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1576886607

   Yup, I will address all comments today, thank you for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya merged pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya merged PR #13711:
URL: https://github.com/apache/kafka/pull/13711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1578656781

   Heya @cadonna, I hope I have addressed all of your comments:
   * Updated the PR's overview
   * Rebased
   * Comments on the code itself
   
   Interestingly enough the test `shouldInitializeNewStandbyTasks` fails locally when I run all tests, but succeeds when it is ran in isolation. If the build passes I will blame it on something in my environment, but if it fails I would be glad for another pair of eyes as to what might be causing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1592990499

   Failing tests are not related. Merging this code.
   ```
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13711/5/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateSourceDefault__/)
   [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13711/5/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateSourceDefault___2/)
   [Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13711/5/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_17_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13711/5/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1592595315

   Good morning! I believe that the failing checks are not connected with this change. Is it okay to merge this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13711:
URL: https://github.com/apache/kafka/pull/13711#issuecomment-1576388176

   @clolov Could you please do the mentioned rebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13711: KAFKA-14133: Migrate StandbyTaskCreator mock in TaskManagerTest to Mockito

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13711:
URL: https://github.com/apache/kafka/pull/13711#discussion_r1218178041


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1740,13 +1740,14 @@ public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throw
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true);
         expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask));
-        expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(activeTaskCreator);
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
         assertThat(uninitializedTask.state(), is(State.CREATED));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());

Review Comment:
   Oh, I see what you mean. You mean that in the EasyMock version there are no calls to
   ```
   verify(standbyTaskCreator)
   ```
   in the these specific tests.
   Yeah, sure, I will change it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org