You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2024/03/22 09:44:00 UTC
(kafka) branch trunk updated: KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 997ca14f805 KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
997ca14f805 is described below
commit 997ca14f8057462e52ea00721c9cc16469ac5108
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Fri Mar 22 09:43:53 2024 +0000
KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.
Reviewers: Ismael Juma <is...@juma.me.uk>, Bruno Cadonna <ca...@apache.org>
---
.../processor/internals/TaskManagerTest.java | 1070 ++++++++++----------
1 file changed, 517 insertions(+), 553 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 36d2a3e3786..64ad0d1caf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -58,9 +58,6 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
@@ -69,10 +66,10 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.InOrder;
-import org.mockito.Mockito;
+import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoRule;
-import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.File;
@@ -107,11 +104,6 @@ import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -125,20 +117,25 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TaskManagerTest {
private final String topic1 = "topic1";
@@ -188,26 +185,26 @@ public class TaskManagerTest {
final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = partitions -> { };
- @org.mockito.Mock
+ @Mock
private InternalTopologyBuilder topologyBuilder;
- @Mock(type = MockType.DEFAULT)
+ @Mock
private StateDirectory stateDirectory;
- @org.mockito.Mock
+ @Mock
private ChangelogReader changeLogReader;
- @org.mockito.Mock
+ @Mock
private Consumer<byte[], byte[]> consumer;
- @org.mockito.Mock
+ @Mock
private ActiveTaskCreator activeTaskCreator;
- @org.mockito.Mock
+ @Mock
private StandbyTaskCreator standbyTaskCreator;
- @org.mockito.Mock
+ @Mock
private Admin adminClient;
- @org.mockito.Mock
+ @Mock
private ProcessorStateManager stateManager;
- @org.mockito.Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ProcessorStateManager.StateStoreMetadata stateStore;
- final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);
- final DefaultTaskManager schedulingTaskManager = Mockito.mock(DefaultTaskManager.class);
+ final StateUpdater stateUpdater = mock(StateUpdater.class);
+ final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class);
private TaskManager taskManager;
private TopologyMetadata topologyMetadata;
@@ -267,7 +264,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(activeTasks, standbyTasks);
- Mockito.verifyNoInteractions(stateUpdater);
+ verifyNoInteractions(stateUpdater);
}
@Test
@@ -276,7 +273,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId03Partitions);
- Mockito.verify(standbyTask, never()).updateInputPartitions(Mockito.eq(taskId03Partitions), Mockito.any());
+ verify(standbyTask, never()).updateInputPartitions(eq(taskId03Partitions), any());
}
@Test
@@ -285,12 +282,12 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId04Partitions);
- Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId04Partitions), Mockito.any());
+ verify(standbyTask).updateInputPartitions(eq(taskId04Partitions), any());
}
private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task standbyTask,
final Set<TopicPartition> newInputPartition) {
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(standbyTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
@@ -299,7 +296,7 @@ public class TaskManagerTest {
mkMap(mkEntry(standbyTask.id(), newInputPartition))
);
- Mockito.verify(standbyTask).resume();
+ verify(standbyTask).resume();
}
@Test
@@ -307,7 +304,7 @@ public class TaskManagerTest {
final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.activeTaskIds()).thenReturn(mkSet(taskId00, taskId01));
when(tasks.task(taskId00)).thenReturn(activeTask1);
@@ -316,9 +313,9 @@ public class TaskManagerTest {
taskManager.handleCorruption(mkSet(taskId00));
- Mockito.verify(consumer).assignment();
- Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
- Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+ verify(consumer).assignment();
+ verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
@@ -329,20 +326,20 @@ public class TaskManagerTest {
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
taskManager.commit(mkSet(activeTask1, activeTask2));
- Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
- Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTaskIds()).thenReturn(mkSet(taskId00, taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -353,8 +350,8 @@ public class TaskManagerTest {
mkMap(mkEntry(taskId01, taskId01Partitions))
);
- Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
- Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
@@ -365,7 +362,7 @@ public class TaskManagerTest {
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -373,8 +370,8 @@ public class TaskManagerTest {
taskManager.handleRevocation(taskId01Partitions);
- Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
- Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+ verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
}
@Test
@@ -385,7 +382,7 @@ public class TaskManagerTest {
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -393,8 +390,8 @@ public class TaskManagerTest {
taskManager.closeAndCleanUpTasks(mkSet(activeTask1), mkSet(), false);
- Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00));
- Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00));
+ verify(schedulingTaskManager).lockTasks(mkSet(taskId00));
+ verify(schedulingTaskManager).unlockTasks(mkSet(taskId00));
}
@Test
@@ -405,14 +402,14 @@ public class TaskManagerTest {
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
- Mockito.verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
- Mockito.verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
+ verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
+ verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
}
@Test
@@ -423,14 +420,14 @@ public class TaskManagerTest {
final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
taskManager.updateLags();
- Mockito.verify(activeTask1).updateLags();
- Mockito.verify(activeTask2).updateLags();
+ verify(activeTask1).updateLags();
+ verify(activeTask2).updateLags();
}
@Test
@@ -438,7 +435,7 @@ public class TaskManagerTest {
final StreamTask activeTaskToRecycle = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
@@ -447,9 +444,9 @@ public class TaskManagerTest {
mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions()))
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -457,7 +454,7 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToRecycle));
@@ -466,10 +463,10 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
- Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater).remove(standbyTaskToRecycle.id());
+ verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -477,16 +474,16 @@ public class TaskManagerTest {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
- Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater).remove(activeTaskToClose.id());
+ verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -494,16 +491,16 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToClose = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
- Mockito.verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater).remove(standbyTaskToClose.id());
+ verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -512,7 +509,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
@@ -521,10 +518,10 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
+ verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -533,7 +530,7 @@ public class TaskManagerTest {
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id())).thenReturn(true);
@@ -543,11 +540,11 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id());
+ verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id());
+ verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -555,7 +552,7 @@ public class TaskManagerTest {
final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask));
@@ -564,8 +561,8 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -573,7 +570,7 @@ public class TaskManagerTest {
final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
@@ -582,10 +579,10 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(tasks).removeTask(reassignedActiveTask);
- Mockito.verify(stateUpdater).add(reassignedActiveTask);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(tasks).removeTask(reassignedActiveTask);
+ verify(stateUpdater).add(reassignedActiveTask);
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -593,7 +590,7 @@ public class TaskManagerTest {
final StreamTask reassignedRevokedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedRevokedActiveTask));
@@ -602,9 +599,9 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedRevokedActiveTask.id());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(tasks).removePendingActiveTaskToSuspend(reassignedRevokedActiveTask.id());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -612,7 +609,7 @@ public class TaskManagerTest {
final StreamTask reassignedLostTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedLostTask));
when(tasks.removePendingTaskToCloseClean(reassignedLostTask.id())).thenReturn(true);
@@ -622,10 +619,10 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id());
- Mockito.verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id());
+ verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -633,7 +630,7 @@ public class TaskManagerTest {
final StreamTask reassignedTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedTask));
when(tasks.removePendingActiveTaskToSuspend(reassignedTask.id())).thenReturn(true);
@@ -643,10 +640,10 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
- Mockito.verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
+ verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -654,7 +651,7 @@ public class TaskManagerTest {
final StandbyTask reassignedStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId01Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
when(tasks.removePendingTaskToRecycle(reassignedStandbyTask.id())).thenReturn(taskId01Partitions);
@@ -664,10 +661,10 @@ public class TaskManagerTest {
mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
);
- Mockito.verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
- Mockito.verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
+ verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -676,7 +673,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
@@ -685,11 +682,11 @@ public class TaskManagerTest {
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks, never())
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
+ verify(tasks, never())
.addPendingTaskToUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -698,7 +695,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
@@ -707,12 +704,12 @@ public class TaskManagerTest {
mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id());
- Mockito.verify(tasks, never())
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
+ verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id());
+ verify(tasks, never())
.addPendingTaskToCloseReviveAndUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -720,7 +717,7 @@ public class TaskManagerTest {
final StandbyTask reassignedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
@@ -729,8 +726,8 @@ public class TaskManagerTest {
mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -741,7 +738,7 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose, standbyTaskToRecycle));
@@ -750,12 +747,12 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
- Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
- Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
- Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(stateUpdater).remove(activeTaskToClose.id());
+ verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
+ verify(stateUpdater).remove(standbyTaskToRecycle.id());
+ verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -766,7 +763,7 @@ public class TaskManagerTest {
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
@@ -782,10 +779,9 @@ public class TaskManagerTest {
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
}
@@ -795,7 +791,7 @@ public class TaskManagerTest {
final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
@@ -804,8 +800,8 @@ public class TaskManagerTest {
taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
- Mockito.verify(tasks).addPendingTasksToInit(createdTasks);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(tasks).addPendingTasksToInit(createdTasks);
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -813,7 +809,7 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToBeCreated = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
final Set<Task> createdTasks = mkSet(standbyTaskToBeCreated);
when(standbyTaskCreator.createTasks(mkMap(
@@ -825,8 +821,8 @@ public class TaskManagerTest {
mkMap(mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions()))
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(tasks).addPendingTasksToInit(createdTasks);
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(tasks).addPendingTasksToInit(createdTasks);
}
@Test
@@ -845,12 +841,12 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
- Mockito.verify(activeTaskToRecycle).prepareCommit();
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
- Mockito.verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
- Mockito.verify(tasks).removeTask(activeTaskToRecycle);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskToRecycle).prepareCommit();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+ verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
+ verify(tasks).removeTask(activeTaskToRecycle);
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -869,11 +865,11 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
- Mockito.verify(activeTaskToRecycle).prepareCommit();
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
- Mockito.verify(tasks).replaceActiveWithStandby(standbyTask);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskToRecycle).prepareCommit();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+ verify(tasks).replaceActiveWithStandby(standbyTask);
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -881,7 +877,7 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
@@ -895,7 +891,7 @@ public class TaskManagerTest {
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId03 + " is managed by the stream thread");
- Mockito.verifyNoInteractions(activeTaskCreator);
+ verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -903,18 +899,18 @@ public class TaskManagerTest {
final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
- Mockito.verify(activeTaskToClose).prepareCommit();
- Mockito.verify(activeTaskToClose).closeClean();
- Mockito.verify(tasks).removeTask(activeTaskToClose);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
+ verify(activeTaskToClose).prepareCommit();
+ verify(activeTaskToClose).closeClean();
+ verify(tasks).removeTask(activeTaskToClose);
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -922,7 +918,7 @@ public class TaskManagerTest {
final StandbyTask standbyTaskToClose = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
@@ -933,7 +929,7 @@ public class TaskManagerTest {
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId03 + " is managed by the stream thread");
- Mockito.verifyNoInteractions(activeTaskCreator);
+ verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -942,7 +938,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true);
@@ -952,9 +948,9 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(activeTaskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(newInputPartitions), any());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(activeTaskToUpdateInputPartitions).updateInputPartitions(eq(newInputPartitions), any());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -962,7 +958,7 @@ public class TaskManagerTest {
final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
@@ -971,8 +967,8 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -980,7 +976,7 @@ public class TaskManagerTest {
final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.SUSPENDED)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
@@ -989,11 +985,11 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
- Mockito.verify(activeTaskToResume).resume();
- Mockito.verify(stateUpdater).add(activeTaskToResume);
- Mockito.verify(tasks).removeTask(activeTaskToResume);
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+ verify(activeTaskToResume).resume();
+ verify(stateUpdater).add(activeTaskToResume);
+ verify(tasks).removeTask(activeTaskToResume);
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -1002,7 +998,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
@@ -1016,7 +1012,7 @@ public class TaskManagerTest {
assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
"but standby task " + taskId02 + " is managed by the stream thread");
- Mockito.verifyNoInteractions(activeTaskCreator);
+ verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -1027,7 +1023,7 @@ public class TaskManagerTest {
final StreamTask activeTaskToCreate = statefulTask(taskId02, taskId02ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
@@ -1036,12 +1032,12 @@ public class TaskManagerTest {
Collections.emptyMap()
);
- Mockito.verify(activeTaskCreator).createTasks(
+ verify(activeTaskCreator).createTasks(
consumer,
mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
);
- Mockito.verify(activeTaskToClose).closeClean();
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verify(activeTaskToClose).closeClean();
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@Test
@@ -1058,10 +1054,10 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(task00).initializeIfNeeded();
- Mockito.verify(task01).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(task00);
- Mockito.verify(stateUpdater).add(task01);
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(stateUpdater).add(task00);
+ verify(stateUpdater).add(task01);
}
@Test
@@ -1080,13 +1076,13 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(task00).initializeIfNeeded();
- Mockito.verify(task01).initializeIfNeeded();
- Mockito.verify(tasks).addPendingTasksToInit(
- Mockito.argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
+ verify(task00).initializeIfNeeded();
+ verify(task01).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
);
- Mockito.verify(stateUpdater, never()).add(task00);
- Mockito.verify(stateUpdater).add(task01);
+ verify(stateUpdater, never()).add(task00);
+ verify(stateUpdater).add(task01);
}
@Test
@@ -1117,13 +1113,13 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(task00Converted).initializeIfNeeded();
- Mockito.verify(task01Converted).initializeIfNeeded();
- Mockito.verify(tasks).addPendingTasksToInit(
- Mockito.argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
+ verify(task00Converted).initializeIfNeeded();
+ verify(task01Converted).initializeIfNeeded();
+ verify(tasks).addPendingTasksToInit(
+ argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
);
- Mockito.verify(stateUpdater, never()).add(task00Converted);
- Mockito.verify(stateUpdater).add(task01Converted);
+ verify(stateUpdater, never()).add(task00Converted);
+ verify(stateUpdater).add(task01Converted);
}
@Test
@@ -1151,13 +1147,13 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
- Mockito.verify(task00).suspend();
- Mockito.verify(task01).suspend();
- Mockito.verify(task00Converted).initializeIfNeeded();
- Mockito.verify(task01Converted).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(task00Converted);
- Mockito.verify(stateUpdater).add(task01Converted);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(task00).suspend();
+ verify(task01).suspend();
+ verify(task00Converted).initializeIfNeeded();
+ verify(task01Converted).initializeIfNeeded();
+ verify(stateUpdater).add(task00Converted);
+ verify(stateUpdater).add(task01Converted);
}
@Test
@@ -1178,11 +1174,11 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
- Mockito.verify(task00).suspend();
- Mockito.verify(task00).closeClean();
- Mockito.verify(task01).suspend();
- Mockito.verify(task01).closeClean();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(task00).suspend();
+ verify(task00).closeClean();
+ verify(task01).suspend();
+ verify(task01).closeClean();
}
@Test
@@ -1204,14 +1200,14 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap());
- Mockito.verify(activeTask, never()).closeDirty();
- Mockito.verify(activeTask, never()).closeClean();
- Mockito.verify(stateUpdater).add(activeTask);
- Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId03Partitions), anyMap());
- Mockito.verify(standbyTask, never()).closeDirty();
- Mockito.verify(standbyTask, never()).closeClean();
- Mockito.verify(stateUpdater).add(standbyTask);
+ verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
+ verify(activeTask, never()).closeDirty();
+ verify(activeTask, never()).closeClean();
+ verify(stateUpdater).add(activeTask);
+ verify(standbyTask).updateInputPartitions(eq(taskId03Partitions), anyMap());
+ verify(standbyTask, never()).closeDirty();
+ verify(standbyTask, never()).closeClean();
+ verify(stateUpdater).add(standbyTask);
}
@Test
@@ -1228,12 +1224,12 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTask).closeClean();
- Mockito.verify(activeTask).revive();
- Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap());
- Mockito.verify(activeTask).initializeIfNeeded();
- Mockito.verify(activeTask, never()).closeDirty();
- Mockito.verify(stateUpdater).add(activeTask);
+ verify(activeTask).closeClean();
+ verify(activeTask).revive();
+ verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
+ verify(activeTask).initializeIfNeeded();
+ verify(activeTask, never()).closeDirty();
+ verify(stateUpdater).add(activeTask);
}
@Test
@@ -1252,9 +1248,9 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(tasks).addTask(statefulTask);
- Mockito.verifyNoInteractions(consumer);
+ verify(statefulTask).suspend();
+ verify(tasks).addTask(statefulTask);
+ verifyNoInteractions(consumer);
}
@Test
@@ -1301,21 +1297,21 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { });
- Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
- Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
- Mockito.verify(convertedTask0).initializeIfNeeded();
- Mockito.verify(convertedTask1).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(convertedTask0);
- Mockito.verify(stateUpdater).add(convertedTask1);
- Mockito.verify(taskToClose).closeClean();
- Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId04Partitions), anyMap());
- Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions);
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive();
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap());
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
- Mockito.verifyNoInteractions(consumer);
+ verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(convertedTask0).initializeIfNeeded();
+ verify(convertedTask1).initializeIfNeeded();
+ verify(stateUpdater).add(convertedTask0);
+ verify(stateUpdater).add(convertedTask1);
+ verify(taskToClose).closeClean();
+ verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId04Partitions), anyMap());
+ verify(stateUpdater).add(taskToUpdateInputPartitions);
+ verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
+ verify(taskToCloseReviveAndUpdateInputPartitions).revive();
+ verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
+ verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
+ verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
+ verifyNoInteractions(consumer);
}
@Test
@@ -1364,8 +1360,8 @@ public class TaskManagerTest {
taskManager.handleRevocation(task.inputPartitions());
- Mockito.verify(tasks).addPendingActiveTaskToSuspend(task.id());
- Mockito.verify(stateUpdater, never()).remove(task.id());
+ verify(tasks).addPendingActiveTaskToSuspend(task.id());
+ verify(stateUpdater, never()).remove(task.id());
}
public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
@@ -1380,8 +1376,8 @@ public class TaskManagerTest {
taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
- Mockito.verify(tasks).addPendingActiveTaskToSuspend(task1.id());
- Mockito.verify(tasks).addPendingActiveTaskToSuspend(task2.id());
+ verify(tasks).addPendingActiveTaskToSuspend(task1.id());
+ verify(tasks).addPendingActiveTaskToSuspend(task2.id());
}
@Test
@@ -1394,8 +1390,8 @@ public class TaskManagerTest {
taskManager.handleRevocation(taskId01Partitions);
- Mockito.verify(stateUpdater, never()).remove(task.id());
- Mockito.verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
+ verify(stateUpdater, never()).remove(task.id());
+ verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
}
@Test
@@ -1408,8 +1404,8 @@ public class TaskManagerTest {
taskManager.handleRevocation(taskId00Partitions);
- Mockito.verify(stateUpdater, never()).remove(task.id());
- Mockito.verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
+ verify(stateUpdater, never()).remove(task.id());
+ verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
}
@Test
@@ -1428,12 +1424,12 @@ public class TaskManagerTest {
taskManager.handleLostAll();
- Mockito.verify(stateUpdater).remove(task1.id());
- Mockito.verify(stateUpdater, never()).remove(task2.id());
- Mockito.verify(stateUpdater).remove(task3.id());
- Mockito.verify(tasks).addPendingTaskToCloseClean(task1.id());
- Mockito.verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
- Mockito.verify(tasks).addPendingTaskToCloseClean(task3.id());
+ verify(stateUpdater).remove(task1.id());
+ verify(stateUpdater, never()).remove(task2.id());
+ verify(stateUpdater).remove(task3.id());
+ verify(tasks).addPendingTaskToCloseClean(task1.id());
+ verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
+ verify(tasks).addPendingTaskToCloseClean(task3.id());
}
private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,
@@ -1454,10 +1450,10 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(task).completeRestoration(noOpResetter);
- Mockito.verify(task).clearTaskTimeout();
- Mockito.verify(tasks).addTask(task);
- Mockito.verify(consumer).resume(task.inputPartitions());
+ verify(task).completeRestoration(noOpResetter);
+ verify(task).clearTaskTimeout();
+ verify(tasks).addTask(task);
+ verify(consumer).resume(task.inputPartitions());
}
@Test
@@ -1472,10 +1468,10 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
- Mockito.verify(tasks, never()).addTask(task);
- Mockito.verify(task, never()).clearTaskTimeout();
- Mockito.verifyNoInteractions(consumer);
+ verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
+ verify(tasks, never()).addTask(task);
+ verify(task, never()).clearTaskTimeout();
+ verifyNoInteractions(consumer);
}
private TaskManager setUpTransitionToRunningOfRestoredTask(final StreamTask statefulTask,
@@ -1514,10 +1510,10 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(standbyTask).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(standbyTask);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+ verify(statefulTask).suspend();
+ verify(standbyTask).initializeIfNeeded();
+ verify(stateUpdater).add(standbyTask);
}
@Test
@@ -1534,8 +1530,8 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
- Mockito.verify(stateUpdater, never()).add(any());
- Mockito.verify(statefulTask).closeDirty();
+ verify(stateUpdater, never()).add(any());
+ verify(statefulTask).closeDirty();
}
@Test
@@ -1556,8 +1552,8 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
- Mockito.verify(stateUpdater, never()).add(any());
- Mockito.verify(standbyTask).closeDirty();
+ verify(stateUpdater, never()).add(any());
+ verify(standbyTask).closeDirty();
}
private TaskManager setUpRecycleRestoredTask(final StreamTask statefulTask) {
@@ -1579,11 +1575,11 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(statefulTask).closeClean();
- Mockito.verify(statefulTask, never()).closeDirty();
- Mockito.verify(tasks, never()).removeTask(statefulTask);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+ verify(statefulTask).suspend();
+ verify(statefulTask).closeClean();
+ verify(statefulTask, never()).closeDirty();
+ verify(tasks, never()).removeTask(statefulTask);
}
@Test
@@ -1600,9 +1596,9 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
- Mockito.verify(statefulTask).closeDirty();
- Mockito.verify(tasks, never()).removeTask(statefulTask);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+ verify(statefulTask).closeDirty();
+ verify(tasks, never()).removeTask(statefulTask);
}
@Test
@@ -1621,8 +1617,8 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
- Mockito.verify(statefulTask, never()).closeDirty();
- Mockito.verify(tasks, never()).removeTask(statefulTask);
+ verify(statefulTask, never()).closeDirty();
+ verify(tasks, never()).removeTask(statefulTask);
}
private TaskManager setUpCloseCleanRestoredTask(final StreamTask statefulTask,
@@ -1649,8 +1645,8 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(stateUpdater).add(statefulTask);
- Mockito.verify(tasks, never()).removeTask(statefulTask);
+ verify(stateUpdater).add(statefulTask);
+ verify(tasks, never()).removeTask(statefulTask);
}
@Test
@@ -1668,11 +1664,11 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(consumer).resume(statefulTask.inputPartitions());
- Mockito.verify(statefulTask).updateInputPartitions(Mockito.eq(taskId01Partitions), anyMap());
- Mockito.verify(statefulTask).completeRestoration(noOpResetter);
- Mockito.verify(statefulTask).clearTaskTimeout();
- Mockito.verify(tasks).addTask(statefulTask);
+ verify(consumer).resume(statefulTask.inputPartitions());
+ verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
+ verify(statefulTask).completeRestoration(noOpResetter);
+ verify(statefulTask).clearTaskTimeout();
+ verify(tasks).addTask(statefulTask);
}
@Test
@@ -1689,11 +1685,11 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(statefulTask).updateInputPartitions(Mockito.eq(taskId01Partitions), anyMap());
- Mockito.verify(statefulTask).closeClean();
- Mockito.verify(statefulTask).revive();
- Mockito.verify(statefulTask).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(statefulTask);
+ verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
+ verify(statefulTask).closeClean();
+ verify(statefulTask).revive();
+ verify(statefulTask).initializeIfNeeded();
+ verify(stateUpdater).add(statefulTask);
}
@Test
@@ -1712,9 +1708,9 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(tasks).addTask(statefulTask);
- Mockito.verifyNoInteractions(consumer);
+ verify(statefulTask).suspend();
+ verify(tasks).addTask(statefulTask);
+ verifyNoInteractions(consumer);
}
@Test
@@ -1776,16 +1772,16 @@ public class TaskManagerTest {
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- Mockito.verify(tasks).addTask(taskToTransitToRunning);
- Mockito.verify(stateUpdater).add(recycledStandbyTask);
- Mockito.verify(stateUpdater).add(recycledStandbyTask);
- Mockito.verify(taskToCloseClean).closeClean();
- Mockito.verify(stateUpdater).add(taskToAddBack);
- Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap());
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive();
- Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
- Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
+ verify(tasks).addTask(taskToTransitToRunning);
+ verify(stateUpdater).add(recycledStandbyTask);
+ verify(stateUpdater).add(recycledStandbyTask);
+ verify(taskToCloseClean).closeClean();
+ verify(stateUpdater).add(taskToAddBack);
+ verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
+ verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
+ verify(taskToCloseReviveAndUpdateInputPartitions).revive();
+ verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
+ verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
}
@Test
@@ -1892,9 +1888,9 @@ public class TaskManagerTest {
() -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
);
- Mockito.verify(tasks).addTask(statefulTask0);
- Mockito.verify(tasks).addTask(statefulTask1);
- Mockito.verify(stateUpdater).add(statefulTask2);
+ verify(tasks).addTask(statefulTask0);
+ verify(tasks).addTask(statefulTask1);
+ verify(stateUpdater).add(statefulTask2);
assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
}
@@ -1913,19 +1909,17 @@ public class TaskManagerTest {
taskManager.handleAssignment(activeTasksAssignment, standbyTasksAssignment);
- Mockito.verify(topologyBuilder).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p1, t1p2, t2p2)), Mockito.anyString());
- Mockito.verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p3, t1p4)), Mockito.anyString());
- Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(activeTasksAssignment));
+ verify(topologyBuilder).addSubscribedTopicsFromAssignment(eq(mkSet(t1p1, t1p2, t2p2)), anyString());
+ verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(eq(mkSet(t1p3, t1p4)), anyString());
+ verify(activeTaskCreator).createTasks(any(), eq(activeTasksAssignment));
}
@Test
public void shouldNotLockAnythingIfStateDirIsEmpty() {
- expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList<>()).once();
+ when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
- verify(stateDirectory);
assertTrue(taskManager.lockedTaskDirectories().isEmpty());
}
@@ -1940,10 +1934,8 @@ public class TaskManagerTest {
taskId10.toString(),
"dummy"
);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
- verify(stateDirectory);
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}
@@ -1951,14 +1943,12 @@ public class TaskManagerTest {
public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
expectLockObtainedFor(taskId01, taskId10);
expectDirectoryNotEmpty(taskId01);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true);
- expectUnlockFor(taskId10);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId10)).thenReturn(true);
makeTaskFolders(taskId01.toString(), taskId10.toString());
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
- verify(stateDirectory);
+ verify(stateDirectory).unlock(taskId10);
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}
@@ -1969,7 +1959,7 @@ public class TaskManagerTest {
taskManager.handleRebalanceComplete();
- Mockito.verify(consumer).pause(assigned);
+ verify(consumer).pause(assigned);
}
@Test
@@ -1977,7 +1967,7 @@ public class TaskManagerTest {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
@@ -1985,21 +1975,19 @@ public class TaskManagerTest {
taskManager.handleRebalanceComplete();
- Mockito.verify(consumer).pause(mkSet(t1p1));
+ verify(consumer).pause(mkSet(t1p1));
}
@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
expectLockObtainedFor(taskId00, taskId01, taskId02);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
- expectUnlockFor(taskId02);
makeTaskFolders(
taskId00.toString(), // active task
taskId01.toString(), // standby task
taskId02.toString() // unassigned but able to lock
);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
@@ -2008,9 +1996,9 @@ public class TaskManagerTest {
taskManager.handleRebalanceComplete();
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
- verify(stateDirectory);
- Mockito.verify(consumer).pause(assignment);
+ verify(stateDirectory).unlock(taskId02);
+ verify(consumer).pause(assignment);
}
@Test
@@ -2027,21 +2015,19 @@ public class TaskManagerTest {
final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
- expectUnlockFor(taskId03);
makeTaskFolders(
taskId00.toString(),
taskId01.toString(),
taskId02.toString(),
taskId03.toString()
);
- replay(stateDirectory);
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1, t1p2);
when(consumer.assignment()).thenReturn(assigned);
@@ -2049,8 +2035,8 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(singleton("topic"));
taskManager.handleRebalanceComplete();
- Mockito.verify(consumer).pause(mkSet(t1p1, t1p2));
- verify(stateDirectory);
+ verify(consumer).pause(mkSet(t1p1, t1p2));
+ verify(stateDirectory).unlock(taskId03);
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
}
@@ -2086,10 +2072,9 @@ public class TaskManagerTest {
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
@@ -2105,10 +2090,9 @@ public class TaskManagerTest {
makeTaskFolders(taskId00.toString());
final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
@@ -2131,7 +2115,7 @@ public class TaskManagerTest {
.thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask)));
when(restoringStandbyTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask));
@@ -2162,7 +2146,6 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask restoringTask = handleAssignment(
@@ -2186,7 +2169,6 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
expectDirectoryNotEmpty(taskId00);
makeTaskFolders(taskId00.toString());
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask restoringTask = handleAssignment(
@@ -2211,7 +2193,6 @@ public class TaskManagerTest {
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
@@ -2228,11 +2209,10 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -2252,13 +2232,12 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
- replay(stateDirectory);
final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
taskManager.handleRebalanceStart(singleton("topic"));
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singleton(closedTask));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(closedTask));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -2273,7 +2252,6 @@ public class TaskManagerTest {
public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
expectLockFailedFor(taskId00);
makeTaskFolders(taskId00.toString());
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.lockedTaskDirectories().isEmpty());
@@ -2285,12 +2263,10 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
expectDirectoryNotEmpty(taskId00);
- expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
- replay(stateDirectory);
+ when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00));
taskManager.handleRebalanceStart(singleton("topic"));
assertTrue(taskManager.getTaskOffsetSums().isEmpty());
- verify(stateDirectory);
}
@Test
@@ -2306,7 +2282,6 @@ public class TaskManagerTest {
expectLockObtainedFor(taskId00);
makeTaskFolders(taskId00.toString());
writeCheckpointFile(taskId00, changelogOffsets);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
@@ -2321,7 +2296,7 @@ public class TaskManagerTest {
// first `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2334,7 +2309,7 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -2346,7 +2321,7 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
taskManager.handleRevocation(taskId00Partitions);
@@ -2362,7 +2337,7 @@ public class TaskManagerTest {
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -2372,18 +2347,20 @@ public class TaskManagerTest {
// `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
- makeTaskFolders(taskId00.toString(), taskId01.toString());
+ final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
+ taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId00.toString()), null));
+ taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId01.toString()), null));
+
+ when(stateDirectory.listNonEmptyTaskDirectories())
+ .thenReturn(taskFolders)
+ .thenReturn(new ArrayList<>());
+
expectLockObtainedFor(taskId00, taskId01);
expectDirectoryNotEmpty(taskId00, taskId01);
- // The second attempt will return empty tasks.
- makeTaskFolders();
- expectLockObtainedFor();
- replay(stateDirectory);
-
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01)));
@@ -2406,7 +2383,7 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -2415,7 +2392,7 @@ public class TaskManagerTest {
taskManager.handleLostAll();
- Mockito.verify(activeTaskCreator).reInitializeThreadProducer();
+ verify(activeTaskCreator).reInitializeThreadProducer();
}
@Test
@@ -2426,7 +2403,7 @@ public class TaskManagerTest {
// `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
// `handleAssignment`
doThrow(new RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
@@ -2458,7 +2435,7 @@ public class TaskManagerTest {
final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
@@ -2471,16 +2448,16 @@ public class TaskManagerTest {
final InOrder standbyTaskOrder = inOrder(corruptedStandbyTask);
standbyTaskOrder.verify(corruptedStandbyTask).closeDirty();
standbyTaskOrder.verify(corruptedStandbyTask).revive();
- Mockito.verify(tasks).removeTask(corruptedActiveTask);
- Mockito.verify(tasks).removeTask(corruptedStandbyTask);
- Mockito.verify(tasks).addPendingTasksToInit(mkSet(corruptedActiveTask));
- Mockito.verify(tasks).addPendingTasksToInit(mkSet(corruptedStandbyTask));
- Mockito.verify(consumer).assignment();
+ verify(tasks).removeTask(corruptedActiveTask);
+ verify(tasks).removeTask(corruptedStandbyTask);
+ verify(tasks).addPendingTasksToInit(mkSet(corruptedActiveTask));
+ verify(tasks).addPendingTasksToInit(mkSet(corruptedStandbyTask));
+ verify(consumer).assignment();
}
@Test
public void shouldReviveCorruptTasks() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2497,7 +2474,7 @@ public class TaskManagerTest {
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(taskId00Partitions);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2513,12 +2490,12 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
@@ -2531,7 +2508,7 @@ public class TaskManagerTest {
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(taskId00Partitions);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2545,12 +2522,12 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2559,7 +2536,7 @@ public class TaskManagerTest {
firstAssignment.putAll(taskId01Assignment);
// `handleAssignment`
- when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+ when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedTask, nonCorruptedTask));
when(consumer.assignment())
@@ -2580,13 +2557,13 @@ public class TaskManagerTest {
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
// check that we should not commit empty map either
- Mockito.verify(consumer, never()).commitSync(emptyMap());
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(consumer, never()).commitSync(emptyMap());
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotCommitNonRunningNonCorruptedTasks() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2597,7 +2574,7 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
// `handleAssignment`
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(assignment)))
.thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
when(consumer.assignment()).thenReturn(taskId00Partitions);
@@ -2611,7 +2588,7 @@ public class TaskManagerTest {
assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
assertFalse(nonRunningNonCorruptedTask.commitPrepared);
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
@@ -2625,21 +2602,20 @@ public class TaskManagerTest {
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
when(tasks.task(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, standbyTask));
when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
taskManager.handleCorruption(mkSet(taskId02));
- Mockito.verify(activeRestoringTask, never()).commitNeeded();
- Mockito.verify(activeRestoringTask, never()).prepareCommit();
- Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
- Mockito.verify(standbyTask, never()).commitNeeded();
- Mockito.verify(standbyTask, never()).prepareCommit();
- Mockito.verify(standbyTask, never()).postCommit(Mockito.anyBoolean());
+ verify(activeRestoringTask, never()).commitNeeded();
+ verify(activeRestoringTask, never()).prepareCommit();
+ verify(activeRestoringTask, never()).postCommit(anyBoolean());
+ verify(standbyTask, never()).commitNeeded();
+ verify(standbyTask, never()).prepareCommit();
+ verify(standbyTask, never()).postCommit(anyBoolean());
}
@Test
@@ -2654,7 +2630,7 @@ public class TaskManagerTest {
final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, activeRestoringTask),
mkEntry(taskId01, standbyTask),
@@ -2666,16 +2642,16 @@ public class TaskManagerTest {
taskManager.handleCorruption(mkSet(taskId02));
- Mockito.verify(activeRestoringTask, never()).commitNeeded();
- Mockito.verify(activeRestoringTask, never()).prepareCommit();
- Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
- Mockito.verify(standbyTask).prepareCommit();
- Mockito.verify(standbyTask).postCommit(Mockito.anyBoolean());
+ verify(activeRestoringTask, never()).commitNeeded();
+ verify(activeRestoringTask, never()).prepareCommit();
+ verify(activeRestoringTask, never()).postCommit(anyBoolean());
+ verify(standbyTask).prepareCommit();
+ verify(standbyTask).postCommit(anyBoolean());
}
@Test
public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2686,7 +2662,7 @@ public class TaskManagerTest {
};
// handleAssignment
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId01Assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(taskId01Assignment)))
.thenReturn(singleton(runningNonCorruptedActive));
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singleton(corruptedStandby));
@@ -2707,13 +2683,13 @@ public class TaskManagerTest {
assertThat(corruptedStandby.commitPrepared, is(true));
assertThat(corruptedStandby.state(), is(Task.State.CREATED));
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
- expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+ when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2726,15 +2702,13 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> firstAssignement = new HashMap<>();
firstAssignement.putAll(taskId00Assignment);
firstAssignement.putAll(taskId01Assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignement)))
+ when(activeTaskCreator.createTasks(any(), eq(firstAssignement)))
.thenReturn(asList(corruptedActive, uncorruptedActive));
when(consumer.assignment())
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
- replay(stateDirectory);
-
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
taskManager.handleAssignment(firstAssignement, emptyMap());
@@ -2759,7 +2733,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2775,7 +2749,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
firstAssignment.putAll(taskId00Assignment);
firstAssignment.putAll(taskId01Assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+ when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedActive, uncorruptedActive));
when(consumer.assignment())
@@ -2812,7 +2786,7 @@ public class TaskManagerTest {
assertThat(corruptedActive.state(), is(Task.State.CREATED));
assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
}
@Test
@@ -2820,7 +2794,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2846,7 +2820,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
firstAssignment.putAll(taskId00Assignment);
firstAssignment.putAll(taskId01Assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+ when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
when(consumer.assignment())
@@ -2892,8 +2866,8 @@ public class TaskManagerTest {
assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED));
assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@@ -2929,7 +2903,7 @@ public class TaskManagerTest {
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
doThrow(new TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets);
@@ -2952,7 +2926,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
- final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+ final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -2987,7 +2961,7 @@ public class TaskManagerTest {
.thenReturn(assignment)
.thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
@@ -3012,8 +2986,8 @@ public class TaskManagerTest {
assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
assertThat(unrevokedActiveTask.state(), is(State.CREATED));
assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
- Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+ verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
}
@Test
@@ -3039,7 +3013,7 @@ public class TaskManagerTest {
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
@@ -3053,9 +3027,9 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.RUNNING));
// expect these calls twice (because we're going to tryToCompleteRestoration twice)
- Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
- Mockito.verify(consumer, times(2)).assignment();
- Mockito.verify(consumer, times(2)).resume(assignment);
+ verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+ verify(consumer, times(2)).assignment();
+ verify(consumer, times(2)).resume(assignment);
}
@Test
@@ -3063,7 +3037,7 @@ public class TaskManagerTest {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3076,9 +3050,9 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
assertEquals(newPartitionsSet, task00.inputPartitions());
// expect these calls twice (because we're going to tryToCompleteRestoration twice)
- Mockito.verify(consumer, times(2)).resume(assignment);
- Mockito.verify(consumer, times(2)).assignment();
- Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
+ verify(consumer, times(2)).resume(assignment);
+ verify(consumer, times(2)).assignment();
+ verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
}
@Test
@@ -3086,7 +3060,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(assignment, emptyMap());
@@ -3097,9 +3071,9 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.RUNNING));
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verify(consumer).assignment();
- Mockito.verify(consumer).resume(Mockito.eq(emptySet()));
+ verify(changeLogReader).enforceRestoreActive();
+ verify(consumer).assignment();
+ verify(consumer).resume(eq(emptySet()));
}
@Test
@@ -3121,7 +3095,7 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
taskManager.handleAssignment(assignment, emptyMap());
@@ -3137,8 +3111,8 @@ public class TaskManagerTest {
Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verifyNoInteractions(consumer);
+ verify(changeLogReader).enforceRestoreActive();
+ verifyNoInteractions(consumer);
}
@Test
@@ -3153,7 +3127,7 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(assignment, emptyMap());
@@ -3167,8 +3141,8 @@ public class TaskManagerTest {
Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verifyNoInteractions(consumer);
+ verify(changeLogReader).enforceRestoreActive();
+ verifyNoInteractions(consumer);
}
@Test
@@ -3178,7 +3152,7 @@ public class TaskManagerTest {
task00.setCommittableOffsetsAndMetadata(offsets);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3224,7 +3198,7 @@ public class TaskManagerTest {
);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
when(activeTaskCreator.threadProducer()).thenReturn(producer);
@@ -3253,7 +3227,7 @@ public class TaskManagerTest {
assertThat(task02.commitPrepared, is(false));
assertThat(task10.commitPrepared, is(false));
- Mockito.verify(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
+ verify(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
}
@Test
@@ -3289,7 +3263,7 @@ public class TaskManagerTest {
);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task10));
@@ -3310,7 +3284,7 @@ public class TaskManagerTest {
assertThat(task02.commitPrepared, is(false));
assertThat(task10.commitPrepared, is(false));
- Mockito.verify(consumer).commitSync(expectedCommittedOffsets);
+ verify(consumer).commitSync(expectedCommittedOffsets);
}
@Test
@@ -3327,7 +3301,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
@@ -3355,7 +3329,7 @@ public class TaskManagerTest {
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
taskManager.handleAssignment(assignmentActive, assignmentStandby);
@@ -3372,7 +3346,7 @@ public class TaskManagerTest {
public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
@@ -3382,7 +3356,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -3396,7 +3370,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
@@ -3463,7 +3437,7 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(assignment)))
.thenReturn(asList(task00, task01, task02, task03));
taskManager.handleAssignment(assignment, emptyMap());
@@ -3491,8 +3465,8 @@ public class TaskManagerTest {
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verify(changeLogReader).completedChangelogs();
+ verify(changeLogReader).enforceRestoreActive();
+ verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3509,9 +3483,9 @@ public class TaskManagerTest {
assertThat(task03.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3529,7 +3503,7 @@ public class TaskManagerTest {
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever"))
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
@@ -3549,8 +3523,8 @@ public class TaskManagerTest {
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verify(changeLogReader).completedChangelogs();
+ verify(changeLogReader).enforceRestoreActive();
+ verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
@@ -3558,9 +3532,9 @@ public class TaskManagerTest {
assertThat(exception.getCause().getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
// the active task creator should also get closed (so that it closes the thread producer if applicable)
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3576,7 +3550,7 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
taskManager.handleAssignment(assignment, emptyMap());
@@ -3595,8 +3569,8 @@ public class TaskManagerTest {
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verify(changeLogReader).completedChangelogs();
+ verify(changeLogReader).enforceRestoreActive();
+ verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
@@ -3605,7 +3579,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -3666,7 +3640,7 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.SUSPENDED));
assertThat(task02.state(), is(Task.State.SUSPENDED));
- Mockito.verifyNoInteractions(activeTaskCreator);
+ verifyNoInteractions(activeTaskCreator);
}
@Test
@@ -3698,8 +3672,8 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01, task02));
- doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.any());
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01, task02));
+ doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
doThrow(new RuntimeException("whatever all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
taskManager.handleAssignment(assignment, emptyMap());
@@ -3724,8 +3698,8 @@ public class TaskManagerTest {
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(changeLogReader).enforceRestoreActive();
- Mockito.verify(changeLogReader).completedChangelogs();
+ verify(changeLogReader).enforceRestoreActive();
+ verify(changeLogReader).completedChangelogs();
taskManager.shutdown(false);
@@ -3734,9 +3708,9 @@ public class TaskManagerTest {
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+ verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
@@ -3760,10 +3734,10 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
// `tryToCompleteRestoration`
- Mockito.verify(consumer).assignment();
- Mockito.verify(consumer).resume(Mockito.eq(emptySet()));
+ verify(consumer).assignment();
+ verify(consumer).resume(eq(emptySet()));
}
@Test
@@ -3782,12 +3756,12 @@ public class TaskManagerTest {
taskManager.shutdown(true);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
- Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
- Mockito.verify(failedStatefulTask).prepareCommit();
- Mockito.verify(failedStatefulTask).suspend();
- Mockito.verify(failedStatefulTask).closeDirty();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ verify(failedStatefulTask).prepareCommit();
+ verify(failedStatefulTask).suspend();
+ verify(failedStatefulTask).closeDirty();
}
@Test
@@ -3797,7 +3771,7 @@ public class TaskManagerTest {
taskManager.shutdown(true);
- Mockito.verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE));
}
@Test
@@ -3815,13 +3789,13 @@ public class TaskManagerTest {
taskManager.shutdown(true);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
- Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
- Mockito.verify(tasks).addActiveTasks(restoredTasks);
- Mockito.verify(statefulTask1).closeClean();
- Mockito.verify(statefulTask2).closeClean();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ verify(tasks).addActiveTasks(restoredTasks);
+ verify(statefulTask1).closeClean();
+ verify(statefulTask2).closeClean();
}
@Test
@@ -3838,13 +3812,13 @@ public class TaskManagerTest {
taskManager.shutdown(true);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
- Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
- Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
- Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
- Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
- Mockito.verify(removedStatefulTask).closeClean();
- Mockito.verify(removedStandbyTask).closeClean();
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+ verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+ verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
+ verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
+ verify(removedStatefulTask).closeClean();
+ verify(removedStandbyTask).closeClean();
}
@Test
@@ -3852,7 +3826,7 @@ public class TaskManagerTest {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -3862,7 +3836,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// verifies that we actually resume the assignment at the end of restoration.
- Mockito.verify(consumer).resume(assignment);
+ verify(consumer).resume(assignment);
}
@Test
@@ -3883,14 +3857,13 @@ public class TaskManagerTest {
@Test
public void shouldHandleRebalanceEvents() {
when(consumer.assignment()).thenReturn(assignment);
- expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList<>());
- replay(stateDirectory);
+ when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
assertThat(taskManager.rebalanceInProgress(), is(false));
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.rebalanceInProgress(), is(true));
taskManager.handleRebalanceComplete();
assertThat(taskManager.rebalanceInProgress(), is(false));
- Mockito.verify(consumer).pause(assignment);
+ verify(consumer).pause(assignment);
}
@Test
@@ -3901,7 +3874,7 @@ public class TaskManagerTest {
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment))
.thenReturn(singletonList(task01));
@@ -3919,7 +3892,7 @@ public class TaskManagerTest {
assertThat(task00.commitNeeded, is(false));
assertThat(task01.commitNeeded, is(false));
- Mockito.verify(consumer).commitSync(offsets);
+ verify(consumer).commitSync(offsets);
}
@Test
@@ -3943,7 +3916,7 @@ public class TaskManagerTest {
);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(Arrays.asList(task00, task01, task02));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(Arrays.asList(task03, task04, task05));
@@ -3995,13 +3968,11 @@ public class TaskManagerTest {
expectDirectoryNotEmpty(taskId00, taskId01);
expectLockObtainedFor(taskId00, taskId01);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
when(standbyTaskCreator.createTasks(taskId01Assignment))
.thenReturn(singletonList(task01));
- replay(stateDirectory);
-
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4034,12 +4005,12 @@ public class TaskManagerTest {
taskManager.commitAll();
- Mockito.verify(consumer).commitSync(offsets);
+ verify(consumer).commitSync(offsets);
}
@Test
public void shouldCommitViaProducerIfEosAlphaEnabled() {
- final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
+ final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
.thenReturn(producer);
@@ -4048,14 +4019,14 @@ public class TaskManagerTest {
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
- Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
- Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
- Mockito.verifyNoMoreInteractions(producer);
+ verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
+ verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
+ verifyNoMoreInteractions(producer);
}
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
- final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
+ final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -4066,8 +4037,8 @@ public class TaskManagerTest {
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
- Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
- Mockito.verifyNoMoreInteractions(producer);
+ verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
+ verifyNoMoreInteractions(producer);
}
private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
@@ -4099,7 +4070,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4144,7 +4115,7 @@ public class TaskManagerTest {
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
- final InOrder inOrder = Mockito.inOrder(adminClient);
+ final InOrder inOrder = inOrder(adminClient);
final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -4155,7 +4126,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4188,7 +4159,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4257,7 +4228,7 @@ public class TaskManagerTest {
);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+ when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02, task03));
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task04));
@@ -4286,7 +4257,7 @@ public class TaskManagerTest {
assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(3));
- Mockito.verify(consumer).commitSync(expectedCommittedOffsets);
+ verify(consumer).commitSync(expectedCommittedOffsets);
}
@Test
@@ -4299,7 +4270,7 @@ public class TaskManagerTest {
firstAssignment.put(taskId01, taskId01Partitions);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+ when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
.thenReturn(Arrays.asList(task00, task01));
taskManager.handleAssignment(firstAssignment, emptyMap());
@@ -4411,7 +4382,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4434,7 +4405,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
.thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -4461,7 +4432,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4481,7 +4452,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4506,7 +4477,7 @@ public class TaskManagerTest {
};
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4526,12 +4497,12 @@ public class TaskManagerTest {
}
};
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
assertThat(task00.state(), is(Task.State.RESTORING));
- Mockito.verifyNoInteractions(consumer);
+ verifyNoInteractions(consumer);
}
@Test
@@ -4541,7 +4512,7 @@ public class TaskManagerTest {
task00.setCommittableOffsetsAndMetadata(offsets);
when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
appender.setClassLoggerToDebug(TaskManager.class);
@@ -4695,7 +4666,7 @@ public class TaskManagerTest {
allActiveTasks.addAll(restoringTasks);
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
+ when(activeTaskCreator.createTasks(any(), eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
lenient().when(consumer.assignment()).thenReturn(assignment);
@@ -4720,28 +4691,21 @@ public class TaskManagerTest {
return allTasks;
}
- private void expectLockObtainedFor(final TaskId... tasks) throws Exception {
+ private void expectLockObtainedFor(final TaskId... tasks) {
for (final TaskId task : tasks) {
- expect(stateDirectory.lock(task)).andReturn(true).once();
+ when(stateDirectory.lock(task)).thenReturn(true);
}
}
- private void expectLockFailedFor(final TaskId... tasks) throws Exception {
+ private void expectLockFailedFor(final TaskId... tasks) {
for (final TaskId task : tasks) {
- expect(stateDirectory.lock(task)).andReturn(false).once();
- }
- }
-
- private void expectUnlockFor(final TaskId... tasks) throws Exception {
- for (final TaskId task : tasks) {
- stateDirectory.unlock(task);
- expectLastCall();
+ when(stateDirectory.lock(task)).thenReturn(false);
}
}
private void expectDirectoryNotEmpty(final TaskId... tasks) {
for (final TaskId taskId : tasks) {
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(false);
}
}
@@ -4790,7 +4754,7 @@ public class TaskManagerTest {
assertNull(task00.timeout);
assertNull(task01.timeout);
- Mockito.verify(consumer, times(2)).commitSync(any(Map.class));
+ verify(consumer, times(2)).commitSync(any(Map.class));
}
@Test
@@ -4832,7 +4796,7 @@ public class TaskManagerTest {
equalTo(Collections.singleton(taskId00))
);
- Mockito.verify(consumer, times(2)).groupMetadata();
+ verify(consumer, times(2)).groupMetadata();
}
@Test
@@ -4867,7 +4831,7 @@ public class TaskManagerTest {
equalTo(mkSet(taskId00, taskId01))
);
- Mockito.verify(consumer).groupMetadata();
+ verify(consumer).groupMetadata();
}
@Test
@@ -4922,7 +4886,7 @@ public class TaskManagerTest {
final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
assignment.putAll(taskId01Assignment);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01));
+ when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
taskManager.handleAssignment(assignment, Collections.emptyMap());
@@ -4933,29 +4897,29 @@ public class TaskManagerTest {
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
assertThat(task00.state(), is(Task.State.SUSPENDED));
assertThat(task01.state(), is(Task.State.SUSPENDED));
- Mockito.verifyNoInteractions(consumer);
+ verifyNoInteractions(consumer);
}
@Test
public void shouldConvertActiveTaskToStandbyTask() {
- final StreamTask activeTask = Mockito.mock(StreamTask.class);
+ final StreamTask activeTask = mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(activeTask.isActive()).thenReturn(true);
- final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
+ final StandbyTask standbyTask = mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);
- when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
- when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);
+ when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
+ when(standbyTaskCreator.createStandbyTaskFromActive(any(), eq(taskId00Partitions))).thenReturn(standbyTask);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
- Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
- Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
- Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
- Mockito.verifyNoInteractions(consumer);
+ verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+ verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
+ verifyNoInteractions(consumer);
}
@Test
@@ -4969,15 +4933,15 @@ public class TaskManagerTest {
when(activeTask.id()).thenReturn(taskId00);
when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask));
- when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), Mockito.eq(taskId00Partitions), any()))
+ when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), any()))
.thenReturn(activeTask);
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
- Mockito.verify(activeTaskCreator, times(2)).createTasks(any(), Mockito.eq(emptyMap()));
- Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
- Mockito.verifyNoInteractions(consumer);
+ verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap()));
+ verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ verifyNoInteractions(consumer);
}
@Test
@@ -5002,14 +4966,14 @@ public class TaskManagerTest {
for (int i = 0; i < names.length; ++i) {
taskFolders.add(new TaskDirectory(testFolder.newFolder(names[i]), null));
}
- expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
+ when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders);
}
private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Long> offsets) throws Exception {
final File checkpointFile = getCheckpointFile(task);
Files.createFile(checkpointFile.toPath());
new OffsetCheckpoint(checkpointFile).write(offsets);
- expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
+ lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);
expectDirectoryNotEmpty(task);
}