You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/11/06 09:41:47 UTC
(kafka) branch trunk updated: KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba394aa28a2 KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
ba394aa28a2 is described below
commit ba394aa28a28ede013f809e0038d84c8062f5b50
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Mon Nov 6 09:41:37 2023 +0000
KAFKA-14133: Move StandbyTaskTest to Mockito (#14679)
Reviewers: Divij Vaidya <di...@amazon.com>
---
.../processor/internals/StandbyTaskTest.java | 215 ++++++---------------
1 file changed, 58 insertions(+), 157 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 49988e410ad..f308e683b1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -44,14 +44,12 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.io.File;
import java.io.IOException;
@@ -79,8 +77,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@RunWith(EasyMockRunner.class)
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StandbyTaskTest {
private final String threadName = "threadName";
@@ -127,13 +132,13 @@ public class StandbyTaskTest {
new IntegerSerializer()
);
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
@Before
public void setup() throws Exception {
- EasyMock.expect(stateManager.taskId()).andStubReturn(taskId);
- EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
restoreStateConsumer.reset();
restoreStateConsumer.updatePartitions(storeChangelogTopicName1, asList(
@@ -170,11 +175,9 @@ public class StandbyTaskTest {
@Test
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
- stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
- EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
- EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.STANDBY);
-
- EasyMock.replay(stateDirectory, stateManager);
+ stateDirectory = mock(StateDirectory.class);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
+ when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
task = createStandbyTask();
@@ -184,10 +187,7 @@ public class StandbyTaskTest {
@Test
public void shouldTransitToRunningAfterInitialization() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject());
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
+ doNothing().when(stateManager).registerStateStores(any(), any());
task = createStandbyTask();
@@ -201,13 +201,10 @@ public class StandbyTaskTest {
task.initializeIfNeeded();
assertEquals(RUNNING, task.state());
-
- EasyMock.verify(stateManager);
}
@Test
public void shouldThrowIfCommittingOnIllegalState() {
- EasyMock.replay(stateManager);
task = createStandbyTask();
task.suspend();
task.closeClean();
@@ -215,35 +212,25 @@ public class StandbyTaskTest {
assertThrows(IllegalStateException.class, task::prepareCommit);
}
-
@Test
public void shouldAlwaysCheckpointStateIfEnforced() {
- stateManager.flush();
- EasyMock.expectLastCall().once();
- stateManager.checkpoint();
- EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
task = createStandbyTask();
task.initializeIfNeeded();
task.maybeCheckpoint(true);
- EasyMock.verify(stateManager);
+ verify(stateManager).flush();
+ verify(stateManager).checkpoint();
}
@Test
public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
- stateManager.flush();
- EasyMock.expectLastCall().once();
- stateManager.checkpoint();
- EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition, 50L))
- .andReturn(Collections.singletonMap(partition, 11000L))
- .andReturn(Collections.singletonMap(partition, 12000L));
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets())
+ .thenReturn(Collections.singletonMap(partition, 50L))
+ .thenReturn(Collections.singletonMap(partition, 11000L))
+ .thenReturn(Collections.singletonMap(partition, 12000L));
task = createStandbyTask();
task.initializeIfNeeded();
@@ -255,22 +242,18 @@ public class StandbyTaskTest {
task.maybeCheckpoint(false); // this should not checkpoint
assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush);
- EasyMock.verify(stateManager);
+ verify(stateManager).flush();
+ verify(stateManager).checkpoint();
}
@Test
public void shouldFlushAndCheckpointStateManagerOnCommit() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.flush();
- EasyMock.expectLastCall();
- stateManager.checkpoint();
- EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition, 50L))
- .andReturn(Collections.singletonMap(partition, 11000L))
- .andReturn(Collections.singletonMap(partition, 11000L));
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
+ doNothing().when(stateManager).flush();
+ when(stateManager.changelogOffsets())
+ .thenReturn(Collections.singletonMap(partition, 50L))
+ .thenReturn(Collections.singletonMap(partition, 11000L))
+ .thenReturn(Collections.singletonMap(partition, 11000L));
task = createStandbyTask();
task.initializeIfNeeded();
@@ -283,32 +266,21 @@ public class StandbyTaskTest {
task.prepareCommit();
task.postCommit(false); // this should not checkpoint
- EasyMock.verify(stateManager);
+ verify(stateManager).checkpoint();
}
@Test
public void shouldReturnStateManagerChangelogOffsets() {
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L));
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition, 50L));
task = createStandbyTask();
assertEquals(Collections.singletonMap(partition, 50L), task.changelogOffsets());
-
- EasyMock.verify(stateManager);
}
@Test
public void shouldNotFlushAndThrowOnCloseDirty() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
- stateManager.flush();
- EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
- stateManager.checkpoint();
- EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
+ doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).close();
final MetricName metricName = setupCloseTaskMetric();
task = createStandbyTask();
@@ -321,35 +293,26 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
- EasyMock.verify(stateManager);
+ verify(stateManager, never()).flush();
+ verify(stateManager, never()).checkpoint();
}
@Test
public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
- EasyMock.replay(stateManager);
+ doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
task = createStandbyTask();
task.initializeIfNeeded();
task.suspend();
task.closeDirty();
-
- EasyMock.verify(stateManager);
}
@Test
public void shouldSuspendAndCommitBeforeCloseClean() {
- stateManager.close();
- EasyMock.expectLastCall();
- stateManager.checkpoint();
- EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition, 60L));
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
- EasyMock.replay(stateManager);
+ doNothing().when(stateManager).close();
+ when(stateManager.changelogOffsets())
+ .thenReturn(Collections.singletonMap(partition, 60L));
final MetricName metricName = setupCloseTaskMetric();
task = createStandbyTask();
@@ -363,13 +326,11 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
- EasyMock.verify(stateManager);
+ verify(stateManager).checkpoint();
}
@Test
public void shouldRequireSuspendingCreatedTasksBeforeClose() {
- EasyMock.replay(stateManager);
task = createStandbyTask();
assertThat(task.state(), equalTo(CREATED));
assertThrows(IllegalStateException.class, () -> task.closeClean());
@@ -380,15 +341,11 @@ public class StandbyTaskTest {
@Test
public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition, 50L))
- .andReturn(Collections.singletonMap(partition, 10100L)).anyTimes();
- stateManager.flush();
- EasyMock.expectLastCall();
- stateManager.checkpoint();
- EasyMock.expectLastCall();
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets())
+ .thenReturn(Collections.singletonMap(partition, 50L))
+ .thenReturn(Collections.singletonMap(partition, 10100L));
+ doNothing().when(stateManager).flush();
+ doNothing().when(stateManager).checkpoint();
task = createStandbyTask();
task.initializeIfNeeded();
@@ -401,17 +358,11 @@ public class StandbyTaskTest {
task.prepareCommit();
task.postCommit(true);
-
- EasyMock.verify(stateManager);
}
@Test
public void shouldThrowOnCloseCleanError() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
- EasyMock.replay(stateManager);
+ doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
final MetricName metricName = setupCloseTaskMetric();
task = createStandbyTask();
@@ -422,21 +373,13 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
- EasyMock.verify(stateManager);
- EasyMock.reset(stateManager);
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
- EasyMock.replay(stateManager);
}
@Test
public void shouldThrowOnCloseCleanCheckpointError() {
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(Collections.singletonMap(partition, 50L));
- stateManager.checkpoint();
- EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
+ when(stateManager.changelogOffsets())
+ .thenReturn(Collections.singletonMap(partition, 50L));
+ doThrow(new RuntimeException("KABOOM!")).when(stateManager).checkpoint();
final MetricName metricName = setupCloseTaskMetric();
task = createStandbyTask();
@@ -449,19 +392,10 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
-
- EasyMock.verify(stateManager);
- EasyMock.reset(stateManager);
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
}
@Test
public void shouldUnregisterMetricsInCloseClean() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
task.initializeIfNeeded();
@@ -474,10 +408,6 @@ public class StandbyTaskTest {
@Test
public void shouldUnregisterMetricsInCloseDirty() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
task.initializeIfNeeded();
@@ -491,10 +421,7 @@ public class StandbyTaskTest {
@Test
public void shouldCloseStateManagerOnTaskCreated() {
- stateManager.close();
- EasyMock.expectLastCall();
-
- EasyMock.replay(stateManager);
+ doNothing().when(stateManager).close();
final MetricName metricName = setupCloseTaskMetric();
@@ -506,20 +433,15 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
- EasyMock.verify(stateManager);
-
assertEquals(Task.State.CLOSED, task.state());
}
@SuppressWarnings("deprecation")
@Test
public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
- stateManager.close();
- EasyMock.expectLastCall();
+ doNothing().when(stateManager).close();
- EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);
-
- EasyMock.replay(stateManager);
+ when(stateManager.baseDir()).thenReturn(baseDir);
final MetricName metricName = setupCloseTaskMetric();
@@ -537,19 +459,14 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
- EasyMock.verify(stateManager);
-
assertEquals(Task.State.CLOSED, task.state());
}
@Test
public void shouldDeleteStateDirOnTaskCreatedAndEosV2UncleanClose() {
- stateManager.close();
- EasyMock.expectLastCall();
-
- EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);
+ doNothing().when(stateManager).close();
- EasyMock.replay(stateManager);
+ when(stateManager.baseDir()).thenReturn(baseDir);
final MetricName metricName = setupCloseTaskMetric();
@@ -567,18 +484,11 @@ public class StandbyTaskTest {
final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
- EasyMock.verify(stateManager);
-
assertEquals(Task.State.CLOSED, task.state());
}
@Test
public void shouldPrepareRecycleSuspendedTask() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- stateManager.recycle();
- EasyMock.expectLastCall().once();
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
assertThrows(IllegalStateException.class, () -> task.prepareRecycle()); // CREATED
@@ -593,12 +503,11 @@ public class StandbyTaskTest {
// This is a regression test so that, if we add some, we will be sure to deregister them.
assertThat(getTaskMetrics(), empty());
- EasyMock.verify(stateManager);
+ verify(stateManager).recycle();
}
@Test
public void shouldAlwaysSuspendCreatedTasks() {
- EasyMock.replay(stateManager);
task = createStandbyTask();
assertThat(task.state(), equalTo(CREATED));
task.suspend();
@@ -607,8 +516,6 @@ public class StandbyTaskTest {
@Test
public void shouldAlwaysSuspendRunningTasks() {
- EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
- EasyMock.replay(stateManager);
task = createStandbyTask();
task.initializeIfNeeded();
assertThat(task.state(), equalTo(RUNNING));
@@ -618,8 +525,6 @@ public class StandbyTaskTest {
@Test
public void shouldInitTaskTimeoutAndEventuallyThrow() {
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
task.maybeInitTaskTimeoutOrThrow(0L, null);
@@ -636,8 +541,6 @@ public class StandbyTaskTest {
@Test
public void shouldClearTaskTimeout() {
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
task.maybeInitTaskTimeoutOrThrow(0L, null);
@@ -647,8 +550,6 @@ public class StandbyTaskTest {
@Test
public void shouldRecordRestoredRecords() {
- EasyMock.replay(stateManager);
-
task = createStandbyTask();
final KafkaMetric totalMetric = getMetric("update", "%s-total", task.id().toString());