You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/20 16:16:25 UTC

[kafka] branch trunk updated: KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f313eaed4f KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
8f313eaed4f is described below

commit 8f313eaed4f9dc9730016e0316529ba0ed4c62b4
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu Jul 20 17:16:18 2023 +0100

    KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
    
    Reviewers: Divij Vaidya <di...@amazon.com>
---
 .../processor/internals/TaskManagerTest.java       | 97 +++++++++-------------
 1 file changed, 41 insertions(+), 56 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 22da72feecd..3c626a8adba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2014,10 +2014,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldReviveCorruptTasks() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        EasyMock.expectLastCall().once();
-        replay(stateManager);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2050,15 +2047,13 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
-        verify(stateManager);
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        replay(stateManager);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
             @Override
@@ -2085,15 +2080,13 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
-        verify(stateManager);
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        replay(stateManager);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2125,13 +2118,12 @@ public class TaskManagerTest {
         assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
 
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldNotCommitNonRunningNonCorruptedTasks() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        replay(stateManager);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2158,13 +2150,12 @@ public class TaskManagerTest {
 
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        replay(stateManager);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
         final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2199,11 +2190,12 @@ public class TaskManagerTest {
         assertThat(corruptedStandby.commitPrepared, is(true));
         assertThat(corruptedStandby.state(), is(Task.State.CREATED));
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
-        final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
         expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());
 
         final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2224,7 +2216,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(consumer, stateDirectory, stateManager);
+        replay(consumer, stateDirectory);
 
         uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
 
@@ -2250,10 +2242,8 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() {
-        final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
-        stateManager.markChangelogAsCorrupted(taskId00Partitions);
-        replay(stateManager);
+    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2310,6 +2300,7 @@ public class TaskManagerTest {
         assertThat(corruptedActive.state(), is(Task.State.CREATED));
         assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
@@ -2317,7 +2308,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
-        final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
         final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2327,7 +2318,6 @@ public class TaskManagerTest {
                 corruptedTaskChangelogMarkedAsCorrupted.set(true);
             }
         };
-        stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
 
         final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
         final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2339,7 +2329,6 @@ public class TaskManagerTest {
         };
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
         uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
-        stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
 
         // handleAssignment
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -2357,7 +2346,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(consumer, stateManager);
+        replay(consumer);
 
         taskManager.handleAssignment(assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2394,6 +2383,8 @@ public class TaskManagerTest {
         assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
         assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
         verify(consumer);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
     }
 
     @Test
@@ -2454,7 +2445,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
-        final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
 
         final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -2479,9 +2470,6 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
         expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
 
-        stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
-        stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
-
         final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
             mkEntry(taskId00, taskId00Partitions),
             mkEntry(taskId01, taskId01Partitions),
@@ -2500,7 +2488,7 @@ public class TaskManagerTest {
 
         expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        replay(consumer, stateManager);
+        replay(consumer);
 
         taskManager.handleAssignment(assignmentActive, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2519,6 +2507,8 @@ public class TaskManagerTest {
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTask.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
     }
 
     @Test
@@ -3594,24 +3584,23 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
-        final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
+        final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
             .thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
 
-        producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
-        expectLastCall();
-        producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
-        expectLastCall();
+        shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
 
-        shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02);
+        Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
+        Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
+        Mockito.verifyNoMoreInteractions(producer);
     }
 
     @Test
     public void shouldCommitViaProducerIfEosV2Enabled() {
-        final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
+        final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -3620,14 +3609,13 @@ public class TaskManagerTest {
         allOffsets.putAll(offsetsT01);
         allOffsets.putAll(offsetsT02);
 
-        producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
-        expectLastCall();
+        shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
 
-        shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02);
+        Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
+        Mockito.verifyNoMoreInteractions(producer);
     }
 
     private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
-                                                     final StreamsProducer producer,
                                                      final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
                                                      final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
         final TaskManager taskManager = setUpTaskManager(processingMode, false);
@@ -3643,11 +3631,11 @@ public class TaskManagerTest {
 
         reset(consumer);
         expect(consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId"));
-        replay(consumer, producer);
+        replay(consumer);
 
         taskManager.commitAll();
 
-        verify(producer, consumer);
+        verify(consumer);
     }
 
     @Test
@@ -4533,21 +4521,18 @@ public class TaskManagerTest {
 
     @Test
     public void shouldConvertActiveTaskToStandbyTask() {
-        final StreamTask activeTask = EasyMock.mock(StreamTask.class);
-        expect(activeTask.id()).andStubReturn(taskId00);
-        expect(activeTask.inputPartitions()).andStubReturn(taskId00Partitions);
-        expect(activeTask.isActive()).andStubReturn(true);
-        expect(activeTask.prepareCommit()).andStubReturn(Collections.emptyMap());
+        final StreamTask activeTask = Mockito.mock(StreamTask.class);
+        when(activeTask.id()).thenReturn(taskId00);
+        when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
+        when(activeTask.isActive()).thenReturn(true);
 
-        final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
-        expect(standbyTask.id()).andStubReturn(taskId00);
+        final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
+        when(standbyTask.id()).thenReturn(taskId00);
 
         when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
-        activeTask.prepareRecycle();
-        expectLastCall().once();
         when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);
 
-        replay(activeTask, standbyTask, consumer);
+        replay(consumer);
 
         taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
         taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);