You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/20 16:16:25 UTC
[kafka] branch trunk updated: KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f313eaed4f KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
8f313eaed4f is described below
commit 8f313eaed4f9dc9730016e0316529ba0ed4c62b4
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu Jul 20 17:16:18 2023 +0100
KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito (#13874)
Reviewers: Divij Vaidya <di...@amazon.com>
---
.../processor/internals/TaskManagerTest.java | 97 +++++++++-------------
1 file changed, 41 insertions(+), 56 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 22da72feecd..3c626a8adba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -2014,10 +2014,7 @@ public class TaskManagerTest {
@Test
public void shouldReviveCorruptTasks() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- EasyMock.expectLastCall().once();
- replay(stateManager);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2050,15 +2047,13 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(stateManager);
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- replay(stateManager);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
@@ -2085,15 +2080,13 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(stateManager);
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- replay(stateManager);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2125,13 +2118,12 @@ public class TaskManagerTest {
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- replay(stateManager);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2158,13 +2150,12 @@ public class TaskManagerTest {
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- replay(stateManager);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2199,11 +2190,12 @@ public class TaskManagerTest {
assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
- final ProcessorStateManager stateManager = EasyMock.createNiceMock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2224,7 +2216,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
- replay(consumer, stateDirectory, stateManager);
+ replay(consumer, stateDirectory);
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
@@ -2250,10 +2242,8 @@ public class TaskManagerTest {
}
@Test
- public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() {
- final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
- stateManager.markChangelogAsCorrupted(taskId00Partitions);
- replay(stateManager);
+ public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2310,6 +2300,7 @@ public class TaskManagerTest {
assertThat(corruptedActive.state(), is(Task.State.CREATED));
assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
@@ -2317,7 +2308,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
- final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2327,7 +2318,6 @@ public class TaskManagerTest {
corruptedTaskChangelogMarkedAsCorrupted.set(true);
}
};
- stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
final AtomicBoolean uncorruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask uncorruptedActiveTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2339,7 +2329,6 @@ public class TaskManagerTest {
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
uncorruptedActiveTask.setCommittableOffsetsAndMetadata(offsets);
- stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
// handleAssignment
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
@@ -2357,7 +2346,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
- replay(consumer, stateManager);
+ replay(consumer);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2394,6 +2383,8 @@ public class TaskManagerTest {
assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
verify(consumer);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@@ -2454,7 +2445,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
- final ProcessorStateManager stateManager = EasyMock.createMock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -2479,9 +2470,6 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(revokedActiveTaskOffsets);
expectedCommittedOffsets.putAll(unrevokedTaskOffsets);
- stateManager.markChangelogAsCorrupted(taskId00ChangelogPartitions);
- stateManager.markChangelogAsCorrupted(taskId01ChangelogPartitions);
-
final Map<TaskId, Set<TopicPartition>> assignmentActive = mkMap(
mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions),
@@ -2500,7 +2488,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
- replay(consumer, stateManager);
+ replay(consumer);
taskManager.handleAssignment(assignmentActive, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2519,6 +2507,8 @@ public class TaskManagerTest {
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTask.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+ Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@@ -3594,24 +3584,23 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
- final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
+ final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
.thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
- producer.commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
- expectLastCall();
- producer.commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
- expectLastCall();
+ shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
- shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, producer, offsetsT01, offsetsT02);
+ Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
+ Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
+ Mockito.verifyNoMoreInteractions(producer);
}
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
- final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
+ final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -3620,14 +3609,13 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
- producer.commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
- expectLastCall();
+ shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
- shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, producer, offsetsT01, offsetsT02);
+ Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
+ Mockito.verifyNoMoreInteractions(producer);
}
private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
- final StreamsProducer producer,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final TaskManager taskManager = setUpTaskManager(processingMode, false);
@@ -3643,11 +3631,11 @@ public class TaskManagerTest {
reset(consumer);
expect(consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId"));
- replay(consumer, producer);
+ replay(consumer);
taskManager.commitAll();
- verify(producer, consumer);
+ verify(consumer);
}
@Test
@@ -4533,21 +4521,18 @@ public class TaskManagerTest {
@Test
public void shouldConvertActiveTaskToStandbyTask() {
- final StreamTask activeTask = EasyMock.mock(StreamTask.class);
- expect(activeTask.id()).andStubReturn(taskId00);
- expect(activeTask.inputPartitions()).andStubReturn(taskId00Partitions);
- expect(activeTask.isActive()).andStubReturn(true);
- expect(activeTask.prepareCommit()).andStubReturn(Collections.emptyMap());
+ final StreamTask activeTask = Mockito.mock(StreamTask.class);
+ when(activeTask.id()).thenReturn(taskId00);
+ when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
+ when(activeTask.isActive()).thenReturn(true);
- final StandbyTask standbyTask = EasyMock.mock(StandbyTask.class);
- expect(standbyTask.id()).andStubReturn(taskId00);
+ final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
+ when(standbyTask.id()).thenReturn(taskId00);
when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
- activeTask.prepareRecycle();
- expectLastCall().once();
when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);
- replay(activeTask, standbyTask, consumer);
+ replay(consumer);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);