You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/11/06 09:41:47 UTC

(kafka) branch trunk updated: KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba394aa28a2 KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
ba394aa28a2 is described below

commit ba394aa28a28ede013f809e0038d84c8062f5b50
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Mon Nov 6 09:41:37 2023 +0000

    KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
    
    Reviewers: Divij Vaidya <di...@amazon.com>
---
 .../processor/internals/StandbyTaskTest.java       | 215 ++++++---------------
 1 file changed, 58 insertions(+), 157 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 49988e410ad..f308e683b1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -44,14 +44,12 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.MockRestoreConsumer;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.File;
 import java.io.IOException;
@@ -79,8 +77,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
-@RunWith(EasyMockRunner.class)
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StandbyTaskTest {
 
     private final String threadName = "threadName";
@@ -127,13 +132,13 @@ public class StandbyTaskTest {
         new IntegerSerializer()
     );
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
     @Before
     public void setup() throws Exception {
-        EasyMock.expect(stateManager.taskId()).andStubReturn(taskId);
-        EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
 
         restoreStateConsumer.reset();
         restoreStateConsumer.updatePartitions(storeChangelogTopicName1, asList(
@@ -170,11 +175,9 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
-        stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
-        EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-        EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
-
-        EasyMock.replay(stateDirectory, stateManager);
+        stateDirectory = mock(StateDirectory.class);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
+        when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
 
         task = createStandbyTask();
 
@@ -184,10 +187,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldTransitToRunningAfterInitialization() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject());
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
+        doNothing().when(stateManager).registerStateStores(any(), any());
 
         task = createStandbyTask();
 
@@ -201,13 +201,10 @@ public class StandbyTaskTest {
         task.initializeIfNeeded();
 
         assertEquals(RUNNING, task.state());
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldThrowIfCommittingOnIllegalState() {
-        EasyMock.replay(stateManager);
         task = createStandbyTask();
         task.suspend();
         task.closeClean();
@@ -215,35 +212,25 @@ public class StandbyTaskTest {
         assertThrows(IllegalStateException.class, task::prepareCommit);
     }
 
-
     @Test
     public void shouldAlwaysCheckpointStateIfEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
         task = createStandbyTask();
 
         task.initializeIfNeeded();
         task.maybeCheckpoint(true);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 50L))
-                .andReturn(Collections.singletonMap(partition, 11000L))
-                .andReturn(Collections.singletonMap(partition, 12000L));
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets())
+                .thenReturn(Collections.singletonMap(partition, 50L))
+                .thenReturn(Collections.singletonMap(partition, 11000L))
+                .thenReturn(Collections.singletonMap(partition, 12000L));
 
         task = createStandbyTask();
         task.initializeIfNeeded();
@@ -255,22 +242,18 @@ public class StandbyTaskTest {
         task.maybeCheckpoint(false);  // this should not checkpoint
         assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).flush();
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.flush();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 50L))
-                .andReturn(Collections.singletonMap(partition, 11000L))
-                .andReturn(Collections.singletonMap(partition, 11000L));
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
+        doNothing().when(stateManager).flush();
+        when(stateManager.changelogOffsets())
+                .thenReturn(Collections.singletonMap(partition, 50L))
+                .thenReturn(Collections.singletonMap(partition, 11000L))
+                .thenReturn(Collections.singletonMap(partition, 11000L));
 
         task = createStandbyTask();
         task.initializeIfNeeded();
@@ -283,32 +266,21 @@ public class StandbyTaskTest {
         task.prepareCommit();
         task.postCommit(false);  // this should not checkpoint
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldReturnStateManagerChangelogOffsets() {
-        EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L));
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition, 50L));
 
         task = createStandbyTask();
 
         assertEquals(Collections.singletonMap(partition, 50L), task.changelogOffsets());
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotFlushAndThrowOnCloseDirty() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.flush();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).close();
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createStandbyTask();
@@ -321,35 +293,26 @@ public class StandbyTaskTest {
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, never()).flush();
+        verify(stateManager, never()).checkpoint();
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
-        stateManager.close();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 60L));
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
+        doNothing().when(stateManager).close();
+        when(stateManager.changelogOffsets())
+                .thenReturn(Collections.singletonMap(partition, 60L));
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createStandbyTask();
@@ -363,13 +326,11 @@ public class StandbyTaskTest {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
-        EasyMock.verify(stateManager);
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldRequireSuspendingCreatedTasksBeforeClose() {
-        EasyMock.replay(stateManager);
         task = createStandbyTask();
         assertThat(task.state(), equalTo(CREATED));
         assertThrows(IllegalStateException.class, () -> task.closeClean());
@@ -380,15 +341,11 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(Collections.singletonMap(partition, 50L))
-            .andReturn(Collections.singletonMap(partition, 10100L)).anyTimes();
-        stateManager.flush();
-        EasyMock.expectLastCall();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall();
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets())
+            .thenReturn(Collections.singletonMap(partition, 50L))
+            .thenReturn(Collections.singletonMap(partition, 10100L));
+        doNothing().when(stateManager).flush();
+        doNothing().when(stateManager).checkpoint();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
@@ -401,17 +358,11 @@ public class StandbyTaskTest {
 
         task.prepareCommit();
         task.postCommit(true);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldThrowOnCloseCleanError() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createStandbyTask();
@@ -422,21 +373,13 @@ public class StandbyTaskTest {
 
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
     }
 
     @Test
     public void shouldThrowOnCloseCleanCheckpointError() {
-        EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(Collections.singletonMap(partition, 50L));
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
+        when(stateManager.changelogOffsets())
+            .thenReturn(Collections.singletonMap(partition, 50L));
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).checkpoint();
         final MetricName metricName = setupCloseTaskMetric();
 
         task = createStandbyTask();
@@ -449,19 +392,10 @@ public class StandbyTaskTest {
 
         final double expectedCloseTaskMetric = 0.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
-        EasyMock.verify(stateManager);
-        EasyMock.reset(stateManager);
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
     }
 
     @Test
     public void shouldUnregisterMetricsInCloseClean() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
         task.initializeIfNeeded();
 
@@ -474,10 +408,6 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldUnregisterMetricsInCloseDirty() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
         task.initializeIfNeeded();
 
@@ -491,10 +421,7 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldCloseStateManagerOnTaskCreated() {
-        stateManager.close();
-        EasyMock.expectLastCall();
-
-        EasyMock.replay(stateManager);
+        doNothing().when(stateManager).close();
 
         final MetricName metricName = setupCloseTaskMetric();
 
@@ -506,20 +433,15 @@ public class StandbyTaskTest {
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-
         assertEquals(Task.State.CLOSED, task.state());
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
-        stateManager.close();
-        EasyMock.expectLastCall();
+        doNothing().when(stateManager).close();
 
-        EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);
-
-        EasyMock.replay(stateManager);
+        when(stateManager.baseDir()).thenReturn(baseDir);
 
         final MetricName metricName = setupCloseTaskMetric();
 
@@ -537,19 +459,14 @@ public class StandbyTaskTest {
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-
         assertEquals(Task.State.CLOSED, task.state());
     }
 
     @Test
     public void shouldDeleteStateDirOnTaskCreatedAndEosV2UncleanClose() {
-        stateManager.close();
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);
+        doNothing().when(stateManager).close();
 
-        EasyMock.replay(stateManager);
+        when(stateManager.baseDir()).thenReturn(baseDir);
 
         final MetricName metricName = setupCloseTaskMetric();
 
@@ -567,18 +484,11 @@ public class StandbyTaskTest {
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
 
-        EasyMock.verify(stateManager);
-
         assertEquals(Task.State.CLOSED, task.state());
     }
 
     @Test
     public void shouldPrepareRecycleSuspendedTask() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.recycle();
-        EasyMock.expectLastCall().once();
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
         assertThrows(IllegalStateException.class, () -> task.prepareRecycle()); // CREATED
 
@@ -593,12 +503,11 @@ public class StandbyTaskTest {
         // This is a regression test so that, if we add some, we will be sure to deregister them.
         assertThat(getTaskMetrics(), empty());
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).recycle();
     }
 
     @Test
     public void shouldAlwaysSuspendCreatedTasks() {
-        EasyMock.replay(stateManager);
         task = createStandbyTask();
         assertThat(task.state(), equalTo(CREATED));
         task.suspend();
@@ -607,8 +516,6 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldAlwaysSuspendRunningTasks() {
-        EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        EasyMock.replay(stateManager);
         task = createStandbyTask();
         task.initializeIfNeeded();
         assertThat(task.state(), equalTo(RUNNING));
@@ -618,8 +525,6 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldInitTaskTimeoutAndEventuallyThrow() {
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
@@ -636,8 +541,6 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldClearTaskTimeout() {
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
 
         task.maybeInitTaskTimeoutOrThrow(0L, null);
@@ -647,8 +550,6 @@ public class StandbyTaskTest {
 
     @Test
     public void shouldRecordRestoredRecords() {
-        EasyMock.replay(stateManager);
-
         task = createStandbyTask();
 
         final KafkaMetric totalMetric = getMetric("update", "%s-total", task.id().toString());