You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2024/03/22 09:44:00 UTC

(kafka) branch trunk updated: KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 997ca14f805 KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
997ca14f805 is described below

commit 997ca14f8057462e52ea00721c9cc16469ac5108
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Fri Mar 22 09:43:53 2024 +0000

    KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito (#15254)
    
    This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
    The change is restricted to a single mock to minimize the scope and make it easier for review.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Bruno Cadonna <ca...@apache.org>
---
 .../processor/internals/TaskManagerTest.java       | 1070 ++++++++++----------
 1 file changed, 517 insertions(+), 553 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 36d2a3e3786..64ad0d1caf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -58,9 +58,6 @@ import java.nio.file.Files;
 import java.time.Duration;
 import java.util.ArrayList;
 
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
@@ -69,10 +66,10 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.InOrder;
-import org.mockito.Mockito;
+import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.junit.MockitoRule;
-import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.io.File;
@@ -107,11 +104,6 @@ import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -125,20 +117,25 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.mock;
 
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class TaskManagerTest {
 
     private final String topic1 = "topic1";
@@ -188,26 +185,26 @@ public class TaskManagerTest {
 
     final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = partitions -> { };
 
-    @org.mockito.Mock
+    @Mock
     private InternalTopologyBuilder topologyBuilder;
-    @Mock(type = MockType.DEFAULT)
+    @Mock
     private StateDirectory stateDirectory;
-    @org.mockito.Mock
+    @Mock
     private ChangelogReader changeLogReader;
-    @org.mockito.Mock
+    @Mock
     private Consumer<byte[], byte[]> consumer;
-    @org.mockito.Mock
+    @Mock
     private ActiveTaskCreator activeTaskCreator;
-    @org.mockito.Mock
+    @Mock
     private StandbyTaskCreator standbyTaskCreator;
-    @org.mockito.Mock
+    @Mock
     private Admin adminClient;
-    @org.mockito.Mock
+    @Mock
     private ProcessorStateManager stateManager;
-    @org.mockito.Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ProcessorStateManager.StateStoreMetadata stateStore;
-    final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);
-    final DefaultTaskManager schedulingTaskManager = Mockito.mock(DefaultTaskManager.class);
+    final StateUpdater stateUpdater = mock(StateUpdater.class);
+    final DefaultTaskManager schedulingTaskManager = mock(DefaultTaskManager.class);
 
     private TaskManager taskManager;
     private TopologyMetadata topologyMetadata;
@@ -267,7 +264,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(activeTasks, standbyTasks);
 
-        Mockito.verifyNoInteractions(stateUpdater);
+        verifyNoInteractions(stateUpdater);
     }
 
     @Test
@@ -276,7 +273,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId03Partitions);
-        Mockito.verify(standbyTask, never()).updateInputPartitions(Mockito.eq(taskId03Partitions), Mockito.any());
+        verify(standbyTask, never()).updateInputPartitions(eq(taskId03Partitions), any());
     }
 
     @Test
@@ -285,12 +282,12 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, taskId04Partitions);
-        Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId04Partitions), Mockito.any());
+        verify(standbyTask).updateInputPartitions(eq(taskId04Partitions), any());
     }
 
     private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task standbyTask,
                                                                                    final Set<TopicPartition> newInputPartition) {
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTask));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
 
@@ -299,7 +296,7 @@ public class TaskManagerTest {
             mkMap(mkEntry(standbyTask.id(), newInputPartition))
         );
 
-        Mockito.verify(standbyTask).resume();
+        verify(standbyTask).resume();
     }
 
     @Test
@@ -307,7 +304,7 @@ public class TaskManagerTest {
         final StreamTask activeTask1 = statefulTask(taskId00, taskId00ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
         when(tasks.activeTaskIds()).thenReturn(mkSet(taskId00, taskId01));
         when(tasks.task(taskId00)).thenReturn(activeTask1);
@@ -316,9 +313,9 @@ public class TaskManagerTest {
 
         taskManager.handleCorruption(mkSet(taskId00));
 
-        Mockito.verify(consumer).assignment();
-        Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
-        Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+        verify(consumer).assignment();
+        verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
     }
 
     @Test
@@ -329,20 +326,20 @@ public class TaskManagerTest {
         final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
         taskManager.commit(mkSet(activeTask1, activeTask2));
 
-        Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
-        Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
     }
 
     @Test
     public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
         when(tasks.allTaskIds()).thenReturn(mkSet(taskId00, taskId01));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -353,8 +350,8 @@ public class TaskManagerTest {
                 mkMap(mkEntry(taskId01, taskId01Partitions))
         );
 
-        Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
-        Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
     }
 
     @Test
@@ -365,7 +362,7 @@ public class TaskManagerTest {
         final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -373,8 +370,8 @@ public class TaskManagerTest {
 
         taskManager.handleRevocation(taskId01Partitions);
 
-        Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
-        Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).lockTasks(mkSet(taskId00, taskId01));
+        verify(schedulingTaskManager).unlockTasks(mkSet(taskId00, taskId01));
     }
 
     @Test
@@ -385,7 +382,7 @@ public class TaskManagerTest {
         final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTask1, activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
@@ -393,8 +390,8 @@ public class TaskManagerTest {
 
         taskManager.closeAndCleanUpTasks(mkSet(activeTask1), mkSet(), false);
 
-        Mockito.verify(schedulingTaskManager).lockTasks(mkSet(taskId00));
-        Mockito.verify(schedulingTaskManager).unlockTasks(mkSet(taskId00));
+        verify(schedulingTaskManager).lockTasks(mkSet(taskId00));
+        verify(schedulingTaskManager).unlockTasks(mkSet(taskId00));
     }
 
     @Test
@@ -405,14 +402,14 @@ public class TaskManagerTest {
         final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
 
         taskManager.resumePollingForPartitionsWithAvailableSpace();
 
-        Mockito.verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
-        Mockito.verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
+        verify(activeTask1).resumePollingForPartitionsWithAvailableSpace();
+        verify(activeTask2).resumePollingForPartitionsWithAvailableSpace();
     }
 
     @Test
@@ -423,14 +420,14 @@ public class TaskManagerTest {
         final StreamTask activeTask2 = statefulTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.activeTasks()).thenReturn(mkSet(activeTask1, activeTask2));
 
         taskManager.updateLags();
 
-        Mockito.verify(activeTask1).updateLags();
-        Mockito.verify(activeTask2).updateLags();
+        verify(activeTask1).updateLags();
+        verify(activeTask2).updateLags();
     }
 
     @Test
@@ -438,7 +435,7 @@ public class TaskManagerTest {
         final StreamTask activeTaskToRecycle = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle));
 
@@ -447,9 +444,9 @@ public class TaskManagerTest {
             mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions()))
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(tasks).addPendingTaskToRecycle(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -457,7 +454,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToRecycle));
 
@@ -466,10 +463,10 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
-        Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater).remove(standbyTaskToRecycle.id());
+        verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -477,16 +474,16 @@ public class TaskManagerTest {
         final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
-        Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater).remove(activeTaskToClose.id());
+        verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -494,16 +491,16 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToClose = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
-        Mockito.verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater).remove(standbyTaskToClose.id());
+        verify(tasks).addPendingTaskToCloseClean(standbyTaskToClose.id());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -512,7 +509,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
 
@@ -521,10 +518,10 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater).remove(activeTaskToUpdateInputPartitions.id());
+        verify(tasks).addPendingTaskToUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -533,7 +530,7 @@ public class TaskManagerTest {
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
         when(tasks.removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id())).thenReturn(true);
@@ -543,11 +540,11 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id());
+        verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id());
+        verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions);
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -555,7 +552,7 @@ public class TaskManagerTest {
         final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedActiveTask));
 
@@ -564,8 +561,8 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -573,7 +570,7 @@ public class TaskManagerTest {
         final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(reassignedActiveTask));
 
@@ -582,10 +579,10 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(tasks).removeTask(reassignedActiveTask);
-        Mockito.verify(stateUpdater).add(reassignedActiveTask);
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(tasks).removeTask(reassignedActiveTask);
+        verify(stateUpdater).add(reassignedActiveTask);
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -593,7 +590,7 @@ public class TaskManagerTest {
         final StreamTask reassignedRevokedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedRevokedActiveTask));
 
@@ -602,9 +599,9 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedRevokedActiveTask.id());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(tasks).removePendingActiveTaskToSuspend(reassignedRevokedActiveTask.id());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -612,7 +609,7 @@ public class TaskManagerTest {
         final StreamTask reassignedLostTask = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedLostTask));
         when(tasks.removePendingTaskToCloseClean(reassignedLostTask.id())).thenReturn(true);
@@ -622,10 +619,10 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id());
-        Mockito.verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(tasks).removePendingTaskToCloseClean(reassignedLostTask.id());
+        verify(tasks).addPendingTaskToAddBack(reassignedLostTask.id());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -633,7 +630,7 @@ public class TaskManagerTest {
         final StreamTask reassignedTask = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RESTORING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedTask));
         when(tasks.removePendingActiveTaskToSuspend(reassignedTask.id())).thenReturn(true);
@@ -643,10 +640,10 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
-        Mockito.verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
+        verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -654,7 +651,7 @@ public class TaskManagerTest {
         final StandbyTask reassignedStandbyTask = standbyTask(taskId01, taskId01ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId01Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
         when(tasks.removePendingTaskToRecycle(reassignedStandbyTask.id())).thenReturn(taskId01Partitions);
@@ -664,10 +661,10 @@ public class TaskManagerTest {
             mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
         );
 
-        Mockito.verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
-        Mockito.verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(tasks).removePendingTaskToRecycle(reassignedStandbyTask.id());
+        verify(tasks).addPendingTaskToAddBack(reassignedStandbyTask.id());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -676,7 +673,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
 
@@ -685,11 +682,11 @@ public class TaskManagerTest {
             mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks, never())
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
+        verify(tasks, never())
             .addPendingTaskToUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -698,7 +695,7 @@ public class TaskManagerTest {
                 .inState(State.RUNNING)
                 .withInputPartitions(taskId02Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
 
@@ -707,12 +704,12 @@ public class TaskManagerTest {
                 mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions))
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id());
-        Mockito.verify(tasks, never())
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id());
+        verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id());
+        verify(tasks, never())
                 .addPendingTaskToCloseReviveAndUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -720,7 +717,7 @@ public class TaskManagerTest {
         final StandbyTask reassignedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedStandbyTask));
 
@@ -729,8 +726,8 @@ public class TaskManagerTest {
             mkMap(mkEntry(reassignedStandbyTask.id(), reassignedStandbyTask.inputPartitions()))
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -741,7 +738,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToClose, standbyTaskToRecycle));
 
@@ -750,12 +747,12 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(stateUpdater).remove(activeTaskToClose.id());
-        Mockito.verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
-        Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
-        Mockito.verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(stateUpdater).remove(activeTaskToClose.id());
+        verify(tasks).addPendingTaskToCloseClean(activeTaskToClose.id());
+        verify(stateUpdater).remove(standbyTaskToRecycle.id());
+        verify(tasks).addPendingTaskToRecycle(standbyTaskToRecycle.id(), standbyTaskToRecycle.inputPartitions());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -766,7 +763,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
@@ -782,10 +779,9 @@ public class TaskManagerTest {
         final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
-        when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask));
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask)));
         assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask)));
     }
@@ -795,7 +791,7 @@ public class TaskManagerTest {
         final StreamTask activeTaskToBeCreated = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
         final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
@@ -804,8 +800,8 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(tasksToBeCreated, Collections.emptyMap());
 
-        Mockito.verify(tasks).addPendingTasksToInit(createdTasks);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(tasks).addPendingTasksToInit(createdTasks);
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -813,7 +809,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToBeCreated = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(standbyTaskToBeCreated);
         when(standbyTaskCreator.createTasks(mkMap(
@@ -825,8 +821,8 @@ public class TaskManagerTest {
             mkMap(mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions()))
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(tasks).addPendingTasksToInit(createdTasks);
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(tasks).addPendingTasksToInit(createdTasks);
     }
 
     @Test
@@ -845,12 +841,12 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
 
-        Mockito.verify(activeTaskToRecycle).prepareCommit();
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
-        Mockito.verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
-        Mockito.verify(tasks).removeTask(activeTaskToRecycle);
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskToRecycle).prepareCommit();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+        verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
+        verify(tasks).removeTask(activeTaskToRecycle);
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -869,11 +865,11 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
 
-        Mockito.verify(activeTaskToRecycle).prepareCommit();
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
-        Mockito.verify(tasks).replaceActiveWithStandby(standbyTask);
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskToRecycle).prepareCommit();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+        verify(tasks).replaceActiveWithStandby(standbyTask);
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -881,7 +877,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToRecycle));
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
 
@@ -895,7 +891,7 @@ public class TaskManagerTest {
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
             "but standby task " + taskId03 + " is managed by the stream thread");
-        Mockito.verifyNoInteractions(activeTaskCreator);
+        verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -903,18 +899,18 @@ public class TaskManagerTest {
         final StreamTask activeTaskToClose = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        Mockito.verify(activeTaskToClose).prepareCommit();
-        Mockito.verify(activeTaskToClose).closeClean();
-        Mockito.verify(tasks).removeTask(activeTaskToClose);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
+        verify(activeTaskToClose).prepareCommit();
+        verify(activeTaskToClose).closeClean();
+        verify(tasks).removeTask(activeTaskToClose);
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -922,7 +918,7 @@ public class TaskManagerTest {
         final StandbyTask standbyTaskToClose = standbyTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToClose));
 
@@ -933,7 +929,7 @@ public class TaskManagerTest {
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
             "but standby task " + taskId03 + " is managed by the stream thread");
-        Mockito.verifyNoInteractions(activeTaskCreator);
+        verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -942,7 +938,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions));
         when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true);
@@ -952,9 +948,9 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(activeTaskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(newInputPartitions), any());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(activeTaskToUpdateInputPartitions).updateInputPartitions(eq(newInputPartitions), any());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -962,7 +958,7 @@ public class TaskManagerTest {
         final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
 
@@ -971,8 +967,8 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -980,7 +976,7 @@ public class TaskManagerTest {
         final StreamTask activeTaskToResume = statefulTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.SUSPENDED)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToResume));
 
@@ -989,11 +985,11 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
-        Mockito.verify(activeTaskToResume).resume();
-        Mockito.verify(stateUpdater).add(activeTaskToResume);
-        Mockito.verify(tasks).removeTask(activeTaskToResume);
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
+        verify(activeTaskToResume).resume();
+        verify(stateUpdater).add(activeTaskToResume);
+        verify(tasks).removeTask(activeTaskToResume);
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -1002,7 +998,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions));
 
@@ -1016,7 +1012,7 @@ public class TaskManagerTest {
 
         assertEquals(illegalStateException.getMessage(), "Standby tasks should only be managed by the state updater, " +
             "but standby task " + taskId02 + " is managed by the stream thread");
-        Mockito.verifyNoInteractions(activeTaskCreator);
+        verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -1027,7 +1023,7 @@ public class TaskManagerTest {
         final StreamTask activeTaskToCreate = statefulTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.CREATED)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
 
@@ -1036,12 +1032,12 @@ public class TaskManagerTest {
             Collections.emptyMap()
         );
 
-        Mockito.verify(activeTaskCreator).createTasks(
+        verify(activeTaskCreator).createTasks(
             consumer,
             mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions()))
         );
-        Mockito.verify(activeTaskToClose).closeClean();
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verify(activeTaskToClose).closeClean();
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 
     @Test
@@ -1058,10 +1054,10 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(task00).initializeIfNeeded();
-        Mockito.verify(task01).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00);
-        Mockito.verify(stateUpdater).add(task01);
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(stateUpdater).add(task00);
+        verify(stateUpdater).add(task01);
     }
 
     @Test
@@ -1080,13 +1076,13 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(task00).initializeIfNeeded();
-        Mockito.verify(task01).initializeIfNeeded();
-        Mockito.verify(tasks).addPendingTasksToInit(
-            Mockito.argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
+        verify(task00).initializeIfNeeded();
+        verify(task01).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+            argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
         );
-        Mockito.verify(stateUpdater, never()).add(task00);
-        Mockito.verify(stateUpdater).add(task01);
+        verify(stateUpdater, never()).add(task00);
+        verify(stateUpdater).add(task01);
     }
 
     @Test
@@ -1117,13 +1113,13 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(tasks).addPendingTasksToInit(
-            Mockito.argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(tasks).addPendingTasksToInit(
+            argThat(tasksToInit -> tasksToInit.contains(task00Converted) && !tasksToInit.contains(task01Converted))
         );
-        Mockito.verify(stateUpdater, never()).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(stateUpdater, never()).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
     }
 
     @Test
@@ -1151,13 +1147,13 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
-        Mockito.verify(task00).suspend();
-        Mockito.verify(task01).suspend();
-        Mockito.verify(task00Converted).initializeIfNeeded();
-        Mockito.verify(task01Converted).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(task00Converted);
-        Mockito.verify(stateUpdater).add(task01Converted);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(task00).suspend();
+        verify(task01).suspend();
+        verify(task00Converted).initializeIfNeeded();
+        verify(task01Converted).initializeIfNeeded();
+        verify(stateUpdater).add(task00Converted);
+        verify(stateUpdater).add(task01Converted);
     }
 
     @Test
@@ -1178,11 +1174,11 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
-        Mockito.verify(task00).suspend();
-        Mockito.verify(task00).closeClean();
-        Mockito.verify(task01).suspend();
-        Mockito.verify(task01).closeClean();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(task00).suspend();
+        verify(task00).closeClean();
+        verify(task01).suspend();
+        verify(task01).closeClean();
     }
 
     @Test
@@ -1204,14 +1200,14 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap());
-        Mockito.verify(activeTask, never()).closeDirty();
-        Mockito.verify(activeTask, never()).closeClean();
-        Mockito.verify(stateUpdater).add(activeTask);
-        Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId03Partitions), anyMap());
-        Mockito.verify(standbyTask, never()).closeDirty();
-        Mockito.verify(standbyTask, never()).closeClean();
-        Mockito.verify(stateUpdater).add(standbyTask);
+        verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
+        verify(activeTask, never()).closeDirty();
+        verify(activeTask, never()).closeClean();
+        verify(stateUpdater).add(activeTask);
+        verify(standbyTask).updateInputPartitions(eq(taskId03Partitions), anyMap());
+        verify(standbyTask, never()).closeDirty();
+        verify(standbyTask, never()).closeClean();
+        verify(stateUpdater).add(standbyTask);
     }
 
     @Test
@@ -1228,12 +1224,12 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTask).closeClean();
-        Mockito.verify(activeTask).revive();
-        Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap());
-        Mockito.verify(activeTask).initializeIfNeeded();
-        Mockito.verify(activeTask, never()).closeDirty();
-        Mockito.verify(stateUpdater).add(activeTask);
+        verify(activeTask).closeClean();
+        verify(activeTask).revive();
+        verify(activeTask).updateInputPartitions(eq(taskId02Partitions), anyMap());
+        verify(activeTask).initializeIfNeeded();
+        verify(activeTask, never()).closeDirty();
+        verify(stateUpdater).add(activeTask);
     }
 
     @Test
@@ -1252,9 +1248,9 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
-        Mockito.verifyNoInteractions(consumer);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -1301,21 +1297,21 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { });
 
-        Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
-        Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
-        Mockito.verify(convertedTask0).initializeIfNeeded();
-        Mockito.verify(convertedTask1).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(convertedTask0);
-        Mockito.verify(stateUpdater).add(convertedTask1);
-        Mockito.verify(taskToClose).closeClean();
-        Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId04Partitions), anyMap());
-        Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions);
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive();
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap());
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
-        Mockito.verifyNoInteractions(consumer);
+        verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(convertedTask0).initializeIfNeeded();
+        verify(convertedTask1).initializeIfNeeded();
+        verify(stateUpdater).add(convertedTask0);
+        verify(stateUpdater).add(convertedTask1);
+        verify(taskToClose).closeClean();
+        verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId04Partitions), anyMap());
+        verify(stateUpdater).add(taskToUpdateInputPartitions);
+        verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
+        verify(taskToCloseReviveAndUpdateInputPartitions).revive();
+        verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
+        verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
+        verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -1364,8 +1360,8 @@ public class TaskManagerTest {
 
         taskManager.handleRevocation(task.inputPartitions());
 
-        Mockito.verify(tasks).addPendingActiveTaskToSuspend(task.id());
-        Mockito.verify(stateUpdater, never()).remove(task.id());
+        verify(tasks).addPendingActiveTaskToSuspend(task.id());
+        verify(stateUpdater, never()).remove(task.id());
     }
 
     public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
@@ -1380,8 +1376,8 @@ public class TaskManagerTest {
 
         taskManager.handleRevocation(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        Mockito.verify(tasks).addPendingActiveTaskToSuspend(task1.id());
-        Mockito.verify(tasks).addPendingActiveTaskToSuspend(task2.id());
+        verify(tasks).addPendingActiveTaskToSuspend(task1.id());
+        verify(tasks).addPendingActiveTaskToSuspend(task2.id());
     }
 
     @Test
@@ -1394,8 +1390,8 @@ public class TaskManagerTest {
 
         taskManager.handleRevocation(taskId01Partitions);
 
-        Mockito.verify(stateUpdater, never()).remove(task.id());
-        Mockito.verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
+        verify(stateUpdater, never()).remove(task.id());
+        verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
     }
 
     @Test
@@ -1408,8 +1404,8 @@ public class TaskManagerTest {
 
         taskManager.handleRevocation(taskId00Partitions);
 
-        Mockito.verify(stateUpdater, never()).remove(task.id());
-        Mockito.verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
+        verify(stateUpdater, never()).remove(task.id());
+        verify(tasks, never()).addPendingActiveTaskToSuspend(task.id());
     }
 
     @Test
@@ -1428,12 +1424,12 @@ public class TaskManagerTest {
 
         taskManager.handleLostAll();
 
-        Mockito.verify(stateUpdater).remove(task1.id());
-        Mockito.verify(stateUpdater, never()).remove(task2.id());
-        Mockito.verify(stateUpdater).remove(task3.id());
-        Mockito.verify(tasks).addPendingTaskToCloseClean(task1.id());
-        Mockito.verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
-        Mockito.verify(tasks).addPendingTaskToCloseClean(task3.id());
+        verify(stateUpdater).remove(task1.id());
+        verify(stateUpdater, never()).remove(task2.id());
+        verify(stateUpdater).remove(task3.id());
+        verify(tasks).addPendingTaskToCloseClean(task1.id());
+        verify(tasks, never()).addPendingTaskToCloseClean(task2.id());
+        verify(tasks).addPendingTaskToCloseClean(task3.id());
     }
 
     private TaskManager setupForRevocationAndLost(final Set<Task> tasksInStateUpdater,
@@ -1454,10 +1450,10 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(task).completeRestoration(noOpResetter);
-        Mockito.verify(task).clearTaskTimeout();
-        Mockito.verify(tasks).addTask(task);
-        Mockito.verify(consumer).resume(task.inputPartitions());
+        verify(task).completeRestoration(noOpResetter);
+        verify(task).clearTaskTimeout();
+        verify(tasks).addTask(task);
+        verify(consumer).resume(task.inputPartitions());
     }
 
     @Test
@@ -1472,10 +1468,10 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), Mockito.eq(timeoutException));
-        Mockito.verify(tasks, never()).addTask(task);
-        Mockito.verify(task, never()).clearTaskTimeout();
-        Mockito.verifyNoInteractions(consumer);
+        verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
+        verify(tasks, never()).addTask(task);
+        verify(task, never()).clearTaskTimeout();
+        verifyNoInteractions(consumer);
     }
 
     private TaskManager setUpTransitionToRunningOfRestoredTask(final StreamTask statefulTask,
@@ -1514,10 +1510,10 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(standbyTask).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(standbyTask);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+        verify(statefulTask).suspend();
+        verify(standbyTask).initializeIfNeeded();
+        verify(stateUpdater).add(standbyTask);
     }
 
     @Test
@@ -1534,8 +1530,8 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
         );
 
-        Mockito.verify(stateUpdater, never()).add(any());
-        Mockito.verify(statefulTask).closeDirty();
+        verify(stateUpdater, never()).add(any());
+        verify(statefulTask).closeDirty();
     }
 
     @Test
@@ -1556,8 +1552,8 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
         );
 
-        Mockito.verify(stateUpdater, never()).add(any());
-        Mockito.verify(standbyTask).closeDirty();
+        verify(stateUpdater, never()).add(any());
+        verify(standbyTask).closeDirty();
     }
 
     private TaskManager setUpRecycleRestoredTask(final StreamTask statefulTask) {
@@ -1579,11 +1575,11 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(statefulTask).closeClean();
-        Mockito.verify(statefulTask, never()).closeDirty();
-        Mockito.verify(tasks, never()).removeTask(statefulTask);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+        verify(statefulTask).suspend();
+        verify(statefulTask).closeClean();
+        verify(statefulTask, never()).closeDirty();
+        verify(tasks, never()).removeTask(statefulTask);
     }
 
     @Test
@@ -1600,9 +1596,9 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
         );
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        Mockito.verify(statefulTask).closeDirty();
-        Mockito.verify(tasks, never()).removeTask(statefulTask);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
+        verify(statefulTask).closeDirty();
+        verify(tasks, never()).removeTask(statefulTask);
     }
 
     @Test
@@ -1621,8 +1617,8 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
         );
 
-        Mockito.verify(statefulTask, never()).closeDirty();
-        Mockito.verify(tasks, never()).removeTask(statefulTask);
+        verify(statefulTask, never()).closeDirty();
+        verify(tasks, never()).removeTask(statefulTask);
     }
 
     private TaskManager setUpCloseCleanRestoredTask(final StreamTask statefulTask,
@@ -1649,8 +1645,8 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(stateUpdater).add(statefulTask);
-        Mockito.verify(tasks, never()).removeTask(statefulTask);
+        verify(stateUpdater).add(statefulTask);
+        verify(tasks, never()).removeTask(statefulTask);
     }
 
     @Test
@@ -1668,11 +1664,11 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(consumer).resume(statefulTask.inputPartitions());
-        Mockito.verify(statefulTask).updateInputPartitions(Mockito.eq(taskId01Partitions), anyMap());
-        Mockito.verify(statefulTask).completeRestoration(noOpResetter);
-        Mockito.verify(statefulTask).clearTaskTimeout();
-        Mockito.verify(tasks).addTask(statefulTask);
+        verify(consumer).resume(statefulTask.inputPartitions());
+        verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
+        verify(statefulTask).completeRestoration(noOpResetter);
+        verify(statefulTask).clearTaskTimeout();
+        verify(tasks).addTask(statefulTask);
     }
 
     @Test
@@ -1689,11 +1685,11 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(statefulTask).updateInputPartitions(Mockito.eq(taskId01Partitions), anyMap());
-        Mockito.verify(statefulTask).closeClean();
-        Mockito.verify(statefulTask).revive();
-        Mockito.verify(statefulTask).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(statefulTask);
+        verify(statefulTask).updateInputPartitions(eq(taskId01Partitions), anyMap());
+        verify(statefulTask).closeClean();
+        verify(statefulTask).revive();
+        verify(statefulTask).initializeIfNeeded();
+        verify(stateUpdater).add(statefulTask);
     }
 
     @Test
@@ -1712,9 +1708,9 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(statefulTask).suspend();
-        Mockito.verify(tasks).addTask(statefulTask);
-        Mockito.verifyNoInteractions(consumer);
+        verify(statefulTask).suspend();
+        verify(tasks).addTask(statefulTask);
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -1776,16 +1772,16 @@ public class TaskManagerTest {
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        Mockito.verify(tasks).addTask(taskToTransitToRunning);
-        Mockito.verify(stateUpdater).add(recycledStandbyTask);
-        Mockito.verify(stateUpdater).add(recycledStandbyTask);
-        Mockito.verify(taskToCloseClean).closeClean();
-        Mockito.verify(stateUpdater).add(taskToAddBack);
-        Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap());
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive();
-        Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
-        Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
+        verify(tasks).addTask(taskToTransitToRunning);
+        verify(stateUpdater).add(recycledStandbyTask);
+        verify(stateUpdater).add(recycledStandbyTask);
+        verify(taskToCloseClean).closeClean();
+        verify(stateUpdater).add(taskToAddBack);
+        verify(taskToUpdateInputPartitions).updateInputPartitions(eq(taskId05Partitions), anyMap());
+        verify(taskToCloseReviveAndUpdateInputPartitions).closeClean();
+        verify(taskToCloseReviveAndUpdateInputPartitions).revive();
+        verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded();
+        verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions);
     }
 
     @Test
@@ -1892,9 +1888,9 @@ public class TaskManagerTest {
             () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)
         );
 
-        Mockito.verify(tasks).addTask(statefulTask0);
-        Mockito.verify(tasks).addTask(statefulTask1);
-        Mockito.verify(stateUpdater).add(statefulTask2);
+        verify(tasks).addTask(statefulTask0);
+        verify(tasks).addTask(statefulTask1);
+        verify(stateUpdater).add(statefulTask2);
         assertEquals(mkSet(taskId00, taskId01), thrown.corruptedTasks());
         assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", thrown.getMessage());
     }
@@ -1913,19 +1909,17 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(activeTasksAssignment, standbyTasksAssignment);
 
-        Mockito.verify(topologyBuilder).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p1, t1p2, t2p2)), Mockito.anyString());
-        Mockito.verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p3, t1p4)), Mockito.anyString());
-        Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(activeTasksAssignment));
+        verify(topologyBuilder).addSubscribedTopicsFromAssignment(eq(mkSet(t1p1, t1p2, t2p2)), anyString());
+        verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(eq(mkSet(t1p3, t1p4)), anyString());
+        verify(activeTaskCreator).createTasks(any(), eq(activeTasksAssignment));
     }
 
     @Test
     public void shouldNotLockAnythingIfStateDirIsEmpty() {
-        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList<>()).once();
+        when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
 
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        verify(stateDirectory);
         assertTrue(taskManager.lockedTaskDirectories().isEmpty());
     }
 
@@ -1940,10 +1934,8 @@ public class TaskManagerTest {
             taskId10.toString(),
             "dummy"
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        verify(stateDirectory);
         assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
     }
 
@@ -1951,14 +1943,12 @@ public class TaskManagerTest {
     public void shouldUnlockEmptyDirsAtRebalanceStart() throws Exception {
         expectLockObtainedFor(taskId01, taskId10);
         expectDirectoryNotEmpty(taskId01);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId10)).andReturn(true);
-        expectUnlockFor(taskId10);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId10)).thenReturn(true);
 
         makeTaskFolders(taskId01.toString(), taskId10.toString());
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        verify(stateDirectory);
+        verify(stateDirectory).unlock(taskId10);
         assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
     }
 
@@ -1969,7 +1959,7 @@ public class TaskManagerTest {
 
         taskManager.handleRebalanceComplete();
 
-        Mockito.verify(consumer).pause(assigned);
+        verify(consumer).pause(assigned);
     }
 
     @Test
@@ -1977,7 +1967,7 @@ public class TaskManagerTest {
         final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId00Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
@@ -1985,21 +1975,19 @@ public class TaskManagerTest {
 
         taskManager.handleRebalanceComplete();
 
-        Mockito.verify(consumer).pause(mkSet(t1p1));
+        verify(consumer).pause(mkSet(t1p1));
     }
 
     @Test
     public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
         expectLockObtainedFor(taskId00, taskId01, taskId02);
         expectDirectoryNotEmpty(taskId00, taskId01, taskId02);
-        expectUnlockFor(taskId02);
 
         makeTaskFolders(
             taskId00.toString(),  // active task
             taskId01.toString(),  // standby task
             taskId02.toString()   // unassigned but able to lock
         );
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
@@ -2008,9 +1996,9 @@ public class TaskManagerTest {
 
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
-        verify(stateDirectory);
 
-        Mockito.verify(consumer).pause(assignment);
+        verify(stateDirectory).unlock(taskId02);
+        verify(consumer).pause(assignment);
     }
 
     @Test
@@ -2027,21 +2015,19 @@ public class TaskManagerTest {
         final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
             .inState(State.CREATED)
             .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
         when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
         when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
         expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
         expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
-        expectUnlockFor(taskId03);
         makeTaskFolders(
             taskId00.toString(),
             taskId01.toString(),
             taskId02.toString(),
             taskId03.toString()
         );
-        replay(stateDirectory);
 
         final Set<TopicPartition> assigned = mkSet(t1p0, t1p1, t1p2);
         when(consumer.assignment()).thenReturn(assigned);
@@ -2049,8 +2035,8 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(singleton("topic"));
         taskManager.handleRebalanceComplete();
 
-        Mockito.verify(consumer).pause(mkSet(t1p1, t1p2));
-        verify(stateDirectory);
+        verify(consumer).pause(mkSet(t1p1, t1p2));
+        verify(stateDirectory).unlock(taskId03);
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
     }
 
@@ -2086,10 +2072,9 @@ public class TaskManagerTest {
         makeTaskFolders(taskId00.toString());
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStatefulTask));
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
@@ -2105,10 +2090,9 @@ public class TaskManagerTest {
         makeTaskFolders(taskId00.toString());
         final Map<TopicPartition, Long> changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L));
         writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask));
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.getTaskOffsetSums(), is(mkMap(mkEntry(taskId00, changelogOffset))));
@@ -2131,7 +2115,7 @@ public class TaskManagerTest {
             .thenReturn(mkMap(mkEntry(t1p1changelog, changelogOffsetOfRestoringStatefulTask)));
         when(restoringStandbyTask.changelogOffsets())
             .thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask)));
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
         when(stateUpdater.getTasks()).thenReturn(mkSet(restoringStandbyTask, restoringStatefulTask));
@@ -2162,7 +2146,6 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         expectDirectoryNotEmpty(taskId00);
         makeTaskFolders(taskId00.toString());
-        replay(stateDirectory);
 
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask restoringTask = handleAssignment(
@@ -2186,7 +2169,6 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         expectDirectoryNotEmpty(taskId00);
         makeTaskFolders(taskId00.toString());
-        replay(stateDirectory);
 
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask restoringTask = handleAssignment(
@@ -2211,7 +2193,6 @@ public class TaskManagerTest {
         makeTaskFolders(taskId00.toString());
         writeCheckpointFile(taskId00, changelogOffsets);
 
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
@@ -2228,11 +2209,10 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         writeCheckpointFile(taskId00, changelogOffsets);
-        replay(stateDirectory);
 
         taskManager.handleRebalanceStart(singleton("topic"));
         final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
@@ -2252,13 +2232,12 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         writeCheckpointFile(taskId00, changelogOffsets);
-        replay(stateDirectory);
 
         final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
 
         taskManager.handleRebalanceStart(singleton("topic"));
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singleton(closedTask));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(closedTask));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
 
@@ -2273,7 +2252,6 @@ public class TaskManagerTest {
     public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
         expectLockFailedFor(taskId00);
         makeTaskFolders(taskId00.toString());
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
         assertTrue(taskManager.lockedTaskDirectories().isEmpty());
 
@@ -2285,12 +2263,10 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         expectDirectoryNotEmpty(taskId00);
-        expect(stateDirectory.checkpointFileFor(taskId00)).andReturn(getCheckpointFile(taskId00));
-        replay(stateDirectory);
+        when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00));
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertTrue(taskManager.getTaskOffsetSums().isEmpty());
-        verify(stateDirectory);
     }
 
     @Test
@@ -2306,7 +2282,6 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         writeCheckpointFile(taskId00, changelogOffsets);
-        replay(stateDirectory);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
@@ -2321,7 +2296,7 @@ public class TaskManagerTest {
         // first `handleAssignment`
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -2334,7 +2309,7 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -2346,7 +2321,7 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         taskManager.handleRevocation(taskId00Partitions);
@@ -2362,7 +2337,7 @@ public class TaskManagerTest {
             is("Encounter unexpected fatal error for task 0_0")
         );
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -2372,18 +2347,20 @@ public class TaskManagerTest {
 
         // `handleAssignment`
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
         when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
 
-        makeTaskFolders(taskId00.toString(), taskId01.toString());
+        final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
+        taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId00.toString()), null));
+        taskFolders.add(new TaskDirectory(testFolder.newFolder(taskId01.toString()), null));
+
+        when(stateDirectory.listNonEmptyTaskDirectories())
+            .thenReturn(taskFolders)
+            .thenReturn(new ArrayList<>());
+
         expectLockObtainedFor(taskId00, taskId01);
         expectDirectoryNotEmpty(taskId00, taskId01);
 
-        // The second attempt will return empty tasks.
-        makeTaskFolders();
-        expectLockObtainedFor();
-        replay(stateDirectory);
-
         taskManager.handleRebalanceStart(emptySet());
         assertThat(taskManager.lockedTaskDirectories(), Matchers.is(mkSet(taskId00, taskId01)));
 
@@ -2406,7 +2383,7 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(emptySet());
 
         assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -2415,7 +2392,7 @@ public class TaskManagerTest {
 
         taskManager.handleLostAll();
 
-        Mockito.verify(activeTaskCreator).reInitializeThreadProducer();
+        verify(activeTaskCreator).reInitializeThreadProducer();
     }
 
     @Test
@@ -2426,7 +2403,7 @@ public class TaskManagerTest {
 
         // `handleAssignment`
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         // `handleAssignment`
         doThrow(new RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
@@ -2458,7 +2435,7 @@ public class TaskManagerTest {
         final StandbyTask corruptedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
             .inState(State.RUNNING)
             .withInputPartitions(taskId02Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
         when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
@@ -2471,16 +2448,16 @@ public class TaskManagerTest {
         final InOrder standbyTaskOrder = inOrder(corruptedStandbyTask);
         standbyTaskOrder.verify(corruptedStandbyTask).closeDirty();
         standbyTaskOrder.verify(corruptedStandbyTask).revive();
-        Mockito.verify(tasks).removeTask(corruptedActiveTask);
-        Mockito.verify(tasks).removeTask(corruptedStandbyTask);
-        Mockito.verify(tasks).addPendingTasksToInit(mkSet(corruptedActiveTask));
-        Mockito.verify(tasks).addPendingTasksToInit(mkSet(corruptedStandbyTask));
-        Mockito.verify(consumer).assignment();
+        verify(tasks).removeTask(corruptedActiveTask);
+        verify(tasks).removeTask(corruptedStandbyTask);
+        verify(tasks).addPendingTasksToInit(mkSet(corruptedActiveTask));
+        verify(tasks).addPendingTasksToInit(mkSet(corruptedStandbyTask));
+        verify(consumer).assignment();
     }
 
     @Test
     public void shouldReviveCorruptTasks() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final AtomicBoolean enforcedCheckpoint = new AtomicBoolean(false);
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2497,7 +2474,7 @@ public class TaskManagerTest {
         when(consumer.assignment())
             .thenReturn(assignment)
             .thenReturn(taskId00Partitions);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2513,12 +2490,12 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
             @Override
@@ -2531,7 +2508,7 @@ public class TaskManagerTest {
         when(consumer.assignment())
             .thenReturn(assignment)
             .thenReturn(taskId00Partitions);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@@ -2545,12 +2522,12 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), is(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
 
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2559,7 +2536,7 @@ public class TaskManagerTest {
         firstAssignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+        when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
             .thenReturn(asList(corruptedTask, nonCorruptedTask));
 
         when(consumer.assignment())
@@ -2580,13 +2557,13 @@ public class TaskManagerTest {
         assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
 
         // check that we should not commit empty map either
-        Mockito.verify(consumer, never()).commitSync(emptyMap());
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(consumer, never()).commitSync(emptyMap());
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldNotCommitNonRunningNonCorruptedTasks() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask nonRunningNonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
@@ -2597,7 +2574,7 @@ public class TaskManagerTest {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(assignment)))
             .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
         when(consumer.assignment()).thenReturn(taskId00Partitions);
 
@@ -2611,7 +2588,7 @@ public class TaskManagerTest {
         assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));
 
         assertFalse(nonRunningNonCorruptedTask.commitPrepared);
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
@@ -2625,21 +2602,20 @@ public class TaskManagerTest {
         final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
             .withInputPartitions(taskId02Partitions)
             .inState(State.RUNNING).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask)));
         when(tasks.task(taskId02)).thenReturn(corruptedTask);
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(stateUpdater.getTasks()).thenReturn(mkSet(activeRestoringTask, standbyTask));
         when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
         taskManager.handleCorruption(mkSet(taskId02));
 
-        Mockito.verify(activeRestoringTask, never()).commitNeeded();
-        Mockito.verify(activeRestoringTask, never()).prepareCommit();
-        Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
-        Mockito.verify(standbyTask, never()).commitNeeded();
-        Mockito.verify(standbyTask, never()).prepareCommit();
-        Mockito.verify(standbyTask, never()).postCommit(Mockito.anyBoolean());
+        verify(activeRestoringTask, never()).commitNeeded();
+        verify(activeRestoringTask, never()).prepareCommit();
+        verify(activeRestoringTask, never()).postCommit(anyBoolean());
+        verify(standbyTask, never()).commitNeeded();
+        verify(standbyTask, never()).prepareCommit();
+        verify(standbyTask, never()).postCommit(anyBoolean());
     }
 
     @Test
@@ -2654,7 +2630,7 @@ public class TaskManagerTest {
         final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions)
             .withInputPartitions(taskId02Partitions)
             .inState(State.RUNNING).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, activeRestoringTask),
             mkEntry(taskId01, standbyTask),
@@ -2666,16 +2642,16 @@ public class TaskManagerTest {
 
         taskManager.handleCorruption(mkSet(taskId02));
 
-        Mockito.verify(activeRestoringTask, never()).commitNeeded();
-        Mockito.verify(activeRestoringTask, never()).prepareCommit();
-        Mockito.verify(activeRestoringTask, never()).postCommit(Mockito.anyBoolean());
-        Mockito.verify(standbyTask).prepareCommit();
-        Mockito.verify(standbyTask).postCommit(Mockito.anyBoolean());
+        verify(activeRestoringTask, never()).commitNeeded();
+        verify(activeRestoringTask, never()).prepareCommit();
+        verify(activeRestoringTask, never()).postCommit(anyBoolean());
+        verify(standbyTask).prepareCommit();
+        verify(standbyTask).postCommit(anyBoolean());
     }
 
     @Test
     public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedStandby = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
         final StateMachineTask runningNonCorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2686,7 +2662,7 @@ public class TaskManagerTest {
         };
 
         // handleAssignment
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId01Assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(taskId01Assignment)))
             .thenReturn(singleton(runningNonCorruptedActive));
         when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singleton(corruptedStandby));
 
@@ -2707,13 +2683,13 @@ public class TaskManagerTest {
 
         assertThat(corruptedStandby.commitPrepared, is(true));
         assertThat(corruptedStandby.state(), is(Task.State.CREATED));
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
     public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
-        expect(stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList<>());
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+        when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
 
         final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
 
@@ -2726,15 +2702,13 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> firstAssignement = new HashMap<>();
         firstAssignement.putAll(taskId00Assignment);
         firstAssignement.putAll(taskId01Assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignement)))
+        when(activeTaskCreator.createTasks(any(), eq(firstAssignement)))
             .thenReturn(asList(corruptedActive, uncorruptedActive));
 
         when(consumer.assignment())
             .thenReturn(assignment)
             .thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
 
-        replay(stateDirectory);
-
         uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
 
         taskManager.handleAssignment(firstAssignement, emptyMap());
@@ -2759,7 +2733,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final StateMachineTask uncorruptedActive = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@@ -2775,7 +2749,7 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
         firstAssignment.putAll(taskId00Assignment);
         firstAssignment.putAll(taskId01Assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+        when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
             .thenReturn(asList(corruptedActive, uncorruptedActive));
 
         when(consumer.assignment())
@@ -2812,7 +2786,7 @@ public class TaskManagerTest {
 
         assertThat(corruptedActive.state(), is(Task.State.CREATED));
         assertThat(uncorruptedActive.state(), is(Task.State.CREATED));
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);
     }
 
     @Test
@@ -2820,7 +2794,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
         final StateMachineTask corruptedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -2846,7 +2820,7 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> firstAssignment = new HashMap<>();
         firstAssignment.putAll(taskId00Assignment);
         firstAssignment.putAll(taskId01Assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+        when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
             .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
 
         when(consumer.assignment())
@@ -2892,8 +2866,8 @@ public class TaskManagerTest {
         assertThat(uncorruptedActiveTask.state(), is(Task.State.CREATED));
         assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
         assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true));
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
     }
 
     @Test
@@ -2929,7 +2903,7 @@ public class TaskManagerTest {
             .thenReturn(assignment)
             .thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
 
         doThrow(new TimeoutException()).when(consumer).commitSync(expectedCommittedOffsets);
@@ -2952,7 +2926,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
-        final ProcessorStateManager stateManager = Mockito.mock(ProcessorStateManager.class);
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
 
         final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final Map<TopicPartition, OffsetAndMetadata> revokedActiveTaskOffsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -2987,7 +2961,7 @@ public class TaskManagerTest {
             .thenReturn(assignment)
             .thenReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded));
 
         final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId");
@@ -3012,8 +2986,8 @@ public class TaskManagerTest {
         assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
         assertThat(unrevokedActiveTask.state(), is(State.CREATED));
         assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING));
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
-        Mockito.verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId00ChangelogPartitions);
+        verify(stateManager).markChangelogAsCorrupted(taskId01ChangelogPartitions);
     }
 
     @Test
@@ -3039,7 +3013,7 @@ public class TaskManagerTest {
         final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
         when(standbyTaskCreator.createTasks(taskId01Assignment)).thenReturn(singletonList(task01));
 
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
@@ -3053,9 +3027,9 @@ public class TaskManagerTest {
         assertThat(task01.state(), is(Task.State.RUNNING));
 
         // expect these calls twice (because we're going to tryToCompleteRestoration twice)
-        Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
-        Mockito.verify(consumer, times(2)).assignment();
-        Mockito.verify(consumer, times(2)).resume(assignment);
+        verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+        verify(consumer, times(2)).assignment();
+        verify(consumer, times(2)).resume(assignment);
     }
 
     @Test
@@ -3063,7 +3037,7 @@ public class TaskManagerTest {
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3076,9 +3050,9 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertEquals(newPartitionsSet, task00.inputPartitions());
         // expect these calls twice (because we're going to tryToCompleteRestoration twice)
-        Mockito.verify(consumer, times(2)).resume(assignment);
-        Mockito.verify(consumer, times(2)).assignment();
-        Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
+        verify(consumer, times(2)).resume(assignment);
+        verify(consumer, times(2)).assignment();
+        verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
     }
 
     @Test
@@ -3086,7 +3060,7 @@ public class TaskManagerTest {
         final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment;
         final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3097,9 +3071,9 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.RUNNING));
         assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verify(consumer).assignment();
-        Mockito.verify(consumer).resume(Mockito.eq(emptySet()));
+        verify(changeLogReader).enforceRestoreActive();
+        verify(consumer).assignment();
+        verify(consumer).resume(eq(emptySet()));
     }
 
     @Test
@@ -3121,7 +3095,7 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3137,8 +3111,8 @@ public class TaskManagerTest {
             Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01)))
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verifyNoInteractions(consumer);
+        verify(changeLogReader).enforceRestoreActive();
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -3153,7 +3127,7 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3167,8 +3141,8 @@ public class TaskManagerTest {
             Matchers.equalTo(mkMap(mkEntry(taskId00, task00)))
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verifyNoInteractions(consumer);
+        verify(changeLogReader).enforceRestoreActive();
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -3178,7 +3152,7 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -3224,7 +3198,7 @@ public class TaskManagerTest {
         );
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(task00, task01, task02));
 
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
@@ -3253,7 +3227,7 @@ public class TaskManagerTest {
         assertThat(task02.commitPrepared, is(false));
         assertThat(task10.commitPrepared, is(false));
 
-        Mockito.verify(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
+        verify(producer).commitTransaction(expectedCommittedOffsets, groupMetadata);
     }
 
     @Test
@@ -3289,7 +3263,7 @@ public class TaskManagerTest {
         );
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(task00, task01, task02));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(singletonList(task10));
@@ -3310,7 +3284,7 @@ public class TaskManagerTest {
         assertThat(task02.commitPrepared, is(false));
         assertThat(task10.commitPrepared, is(false));
 
-        Mockito.verify(consumer).commitSync(expectedCommittedOffsets);
+        verify(consumer).commitSync(expectedCommittedOffsets);
     }
 
     @Test
@@ -3327,7 +3301,7 @@ public class TaskManagerTest {
 
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
         when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
@@ -3355,7 +3329,7 @@ public class TaskManagerTest {
 
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))).thenReturn(singleton(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive))).thenReturn(singleton(task00));
         when(standbyTaskCreator.createTasks(assignmentStandby)).thenReturn(singletonList(task10));
 
         taskManager.handleAssignment(assignmentActive, assignmentStandby);
@@ -3372,7 +3346,7 @@ public class TaskManagerTest {
     public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(task00.state(), is(Task.State.CREATED));
@@ -3382,7 +3356,7 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), emptyMap());
         assertThat(task00.state(), is(Task.State.CLOSED));
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -3396,7 +3370,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
         assertThat(task00.state(), is(Task.State.RUNNING));
@@ -3463,7 +3437,7 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(assignment)))
             .thenReturn(asList(task00, task01, task02, task03));
 
         taskManager.handleAssignment(assignment, emptyMap());
@@ -3491,8 +3465,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verify(changeLogReader).completedChangelogs();
+        verify(changeLogReader).enforceRestoreActive();
+        verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3509,9 +3483,9 @@ public class TaskManagerTest {
         assertThat(task03.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3529,7 +3503,7 @@ public class TaskManagerTest {
         final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
         doThrow(new RuntimeException("whatever"))
             .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
 
@@ -3549,8 +3523,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verify(changeLogReader).completedChangelogs();
+        verify(changeLogReader).enforceRestoreActive();
+        verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
 
@@ -3558,9 +3532,9 @@ public class TaskManagerTest {
         assertThat(exception.getCause().getMessage(), is("whatever"));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3576,7 +3550,7 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
         doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
 
         taskManager.handleAssignment(assignment, emptyMap());
@@ -3595,8 +3569,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verify(changeLogReader).completedChangelogs();
+        verify(changeLogReader).enforceRestoreActive();
+        verify(changeLogReader).completedChangelogs();
 
         final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
 
@@ -3605,7 +3579,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -3666,7 +3640,7 @@ public class TaskManagerTest {
         assertThat(task01.state(), is(Task.State.SUSPENDED));
         assertThat(task02.state(), is(Task.State.SUSPENDED));
 
-        Mockito.verifyNoInteractions(activeTaskCreator);
+        verifyNoInteractions(activeTaskCreator);
     }
 
     @Test
@@ -3698,8 +3672,8 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01, task02));
-        doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.any());
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01, task02));
+        doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
         doThrow(new RuntimeException("whatever all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
 
         taskManager.handleAssignment(assignment, emptyMap());
@@ -3724,8 +3698,8 @@ public class TaskManagerTest {
             )
         );
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(changeLogReader).enforceRestoreActive();
-        Mockito.verify(changeLogReader).completedChangelogs();
+        verify(changeLogReader).enforceRestoreActive();
+        verify(changeLogReader).completedChangelogs();
 
         taskManager.shutdown(false);
 
@@ -3734,9 +3708,9 @@ public class TaskManagerTest {
         assertThat(task02.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
+        verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
     }
 
     @Test
@@ -3760,10 +3734,10 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes the thread producer if applicable)
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
         // `tryToCompleteRestoration`
-        Mockito.verify(consumer).assignment();
-        Mockito.verify(consumer).resume(Mockito.eq(emptySet()));
+        verify(consumer).assignment();
+        verify(consumer).resume(eq(emptySet()));
     }
 
     @Test
@@ -3782,12 +3756,12 @@ public class TaskManagerTest {
 
         taskManager.shutdown(true);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
-        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
-        Mockito.verify(failedStatefulTask).prepareCommit();
-        Mockito.verify(failedStatefulTask).suspend();
-        Mockito.verify(failedStatefulTask).closeDirty();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        verify(failedStatefulTask).prepareCommit();
+        verify(failedStatefulTask).suspend();
+        verify(failedStatefulTask).closeDirty();
     }
 
     @Test
@@ -3797,7 +3771,7 @@ public class TaskManagerTest {
 
         taskManager.shutdown(true);
 
-        Mockito.verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        verify(schedulingTaskManager).shutdown(Duration.ofMillis(Long.MAX_VALUE));
     }
 
     @Test
@@ -3815,13 +3789,13 @@ public class TaskManagerTest {
 
         taskManager.shutdown(true);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
-        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
-        Mockito.verify(tasks).addActiveTasks(restoredTasks);
-        Mockito.verify(statefulTask1).closeClean();
-        Mockito.verify(statefulTask2).closeClean();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask1.id());
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask2.id());
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        verify(tasks).addActiveTasks(restoredTasks);
+        verify(statefulTask1).closeClean();
+        verify(statefulTask2).closeClean();
     }
 
     @Test
@@ -3838,13 +3812,13 @@ public class TaskManagerTest {
 
         taskManager.shutdown(true);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
-        Mockito.verify(activeTaskCreator).closeThreadProducerIfNeeded();
-        Mockito.verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
-        Mockito.verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
-        Mockito.verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
-        Mockito.verify(removedStatefulTask).closeClean();
-        Mockito.verify(removedStandbyTask).closeClean();
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(removedStatefulTask.id());
+        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
+        verify(tasks).addActiveTasks(mkSet(removedStatefulTask));
+        verify(tasks).addStandbyTasks(mkSet(removedStandbyTask));
+        verify(removedStatefulTask).closeClean();
+        verify(removedStandbyTask).closeClean();
     }
 
     @Test
@@ -3852,7 +3826,7 @@ public class TaskManagerTest {
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         when(consumer.assignment()).thenReturn(assignment);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
             .thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -3862,7 +3836,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.equalTo(singletonMap(taskId00, task00)));
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // verifies that we actually resume the assignment at the end of restoration.
-        Mockito.verify(consumer).resume(assignment);
+        verify(consumer).resume(assignment);
     }
 
     @Test
@@ -3883,14 +3857,13 @@ public class TaskManagerTest {
     @Test
     public void shouldHandleRebalanceEvents() {
         when(consumer.assignment()).thenReturn(assignment);
-        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList<>());
-        replay(stateDirectory);
+        when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(new ArrayList<>());
         assertThat(taskManager.rebalanceInProgress(), is(false));
         taskManager.handleRebalanceStart(emptySet());
         assertThat(taskManager.rebalanceInProgress(), is(true));
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.rebalanceInProgress(), is(false));
-        Mockito.verify(consumer).pause(assignment);
+        verify(consumer).pause(assignment);
     }
 
     @Test
@@ -3901,7 +3874,7 @@ public class TaskManagerTest {
         final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
             .thenReturn(singletonList(task00));
         when(standbyTaskCreator.createTasks(taskId01Assignment))
             .thenReturn(singletonList(task01));
@@ -3919,7 +3892,7 @@ public class TaskManagerTest {
         assertThat(task00.commitNeeded, is(false));
         assertThat(task01.commitNeeded, is(false));
 
-        Mockito.verify(consumer).commitSync(offsets);
+        verify(consumer).commitSync(offsets);
     }
 
     @Test
@@ -3943,7 +3916,7 @@ public class TaskManagerTest {
         );
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(Arrays.asList(task00, task01, task02));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(Arrays.asList(task03, task04, task05));
@@ -3995,13 +3968,11 @@ public class TaskManagerTest {
         expectDirectoryNotEmpty(taskId00, taskId01);
         expectLockObtainedFor(taskId00, taskId01);
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
             .thenReturn(singletonList(task00));
         when(standbyTaskCreator.createTasks(taskId01Assignment))
             .thenReturn(singletonList(task01));
 
-        replay(stateDirectory);
-
         taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
 
@@ -4034,12 +4005,12 @@ public class TaskManagerTest {
 
         taskManager.commitAll();
 
-        Mockito.verify(consumer).commitSync(offsets);
+        verify(consumer).commitSync(offsets);
     }
 
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
-        final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
             .thenReturn(producer);
 
@@ -4048,14 +4019,14 @@ public class TaskManagerTest {
 
         shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_ALPHA, offsetsT01, offsetsT02);
 
-        Mockito.verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
-        Mockito.verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
-        Mockito.verifyNoMoreInteractions(producer);
+        verify(producer).commitTransaction(offsetsT01, new ConsumerGroupMetadata("appId"));
+        verify(producer).commitTransaction(offsetsT02, new ConsumerGroupMetadata("appId"));
+        verifyNoMoreInteractions(producer);
     }
 
     @Test
     public void shouldCommitViaProducerIfEosV2Enabled() {
-        final StreamsProducer producer = Mockito.mock(StreamsProducer.class);
+        final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.threadProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -4066,8 +4037,8 @@ public class TaskManagerTest {
 
         shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
 
-        Mockito.verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
-        Mockito.verifyNoMoreInteractions(producer);
+        verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
+        verifyNoMoreInteractions(producer);
     }
 
     private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
@@ -4099,7 +4070,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4144,7 +4115,7 @@ public class TaskManagerTest {
         when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
             .thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
 
-        final InOrder inOrder = Mockito.inOrder(adminClient);
+        final InOrder inOrder = inOrder(adminClient);
 
         final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
         final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@@ -4155,7 +4126,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4188,7 +4159,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4257,7 +4228,7 @@ public class TaskManagerTest {
         );
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive)))
+        when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(task00, task01, task02, task03));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(singletonList(task04));
@@ -4286,7 +4257,7 @@ public class TaskManagerTest {
 
         assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(3));
 
-        Mockito.verify(consumer).commitSync(expectedCommittedOffsets);
+        verify(consumer).commitSync(expectedCommittedOffsets);
     }
 
     @Test
@@ -4299,7 +4270,7 @@ public class TaskManagerTest {
         firstAssignment.put(taskId01, taskId01Partitions);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(firstAssignment)))
+        when(activeTaskCreator.createTasks(any(), eq(firstAssignment)))
             .thenReturn(Arrays.asList(task00, task01));
 
         taskManager.handleAssignment(firstAssignment, emptyMap());
@@ -4411,7 +4382,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4434,7 +4405,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment)))
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment)))
             .thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
@@ -4461,7 +4432,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4481,7 +4452,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4506,7 +4477,7 @@ public class TaskManagerTest {
         };
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@@ -4526,12 +4497,12 @@ public class TaskManagerTest {
             }
         };
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         taskManager.handleAssignment(taskId00Assignment, emptyMap());
         assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
         assertThat(task00.state(), is(Task.State.RESTORING));
-        Mockito.verifyNoInteractions(consumer);
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -4541,7 +4512,7 @@ public class TaskManagerTest {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
             appender.setClassLoggerToDebug(TaskManager.class);
@@ -4695,7 +4666,7 @@ public class TaskManagerTest {
         allActiveTasks.addAll(restoringTasks);
 
         when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
+        when(activeTaskCreator.createTasks(any(), eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
 
         lenient().when(consumer.assignment()).thenReturn(assignment);
 
@@ -4720,28 +4691,21 @@ public class TaskManagerTest {
         return allTasks;
     }
 
-    private void expectLockObtainedFor(final TaskId... tasks) throws Exception {
+    private void expectLockObtainedFor(final TaskId... tasks) {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(true).once();
+            when(stateDirectory.lock(task)).thenReturn(true);
         }
     }
 
-    private void expectLockFailedFor(final TaskId... tasks) throws Exception {
+    private void expectLockFailedFor(final TaskId... tasks) {
         for (final TaskId task : tasks) {
-            expect(stateDirectory.lock(task)).andReturn(false).once();
-        }
-    }
-
-    private void expectUnlockFor(final TaskId... tasks) throws Exception {
-        for (final TaskId task : tasks) {
-            stateDirectory.unlock(task);
-            expectLastCall();
+            when(stateDirectory.lock(task)).thenReturn(false);
         }
     }
 
     private void expectDirectoryNotEmpty(final TaskId... tasks) {
         for (final TaskId taskId : tasks) {
-            expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(false);
+            when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(false);
         }
     }
 
@@ -4790,7 +4754,7 @@ public class TaskManagerTest {
         assertNull(task00.timeout);
         assertNull(task01.timeout);
 
-        Mockito.verify(consumer, times(2)).commitSync(any(Map.class));
+        verify(consumer, times(2)).commitSync(any(Map.class));
     }
 
     @Test
@@ -4832,7 +4796,7 @@ public class TaskManagerTest {
             equalTo(Collections.singleton(taskId00))
         );
 
-        Mockito.verify(consumer, times(2)).groupMetadata();
+        verify(consumer, times(2)).groupMetadata();
     }
 
     @Test
@@ -4867,7 +4831,7 @@ public class TaskManagerTest {
             equalTo(mkSet(taskId00, taskId01))
         );
 
-        Mockito.verify(consumer).groupMetadata();
+        verify(consumer).groupMetadata();
     }
 
     @Test
@@ -4922,7 +4886,7 @@ public class TaskManagerTest {
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
 
         taskManager.handleAssignment(assignment, Collections.emptyMap());
 
@@ -4933,29 +4897,29 @@ public class TaskManagerTest {
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
         assertThat(task00.state(), is(Task.State.SUSPENDED));
         assertThat(task01.state(), is(Task.State.SUSPENDED));
-        Mockito.verifyNoInteractions(consumer);
+        verifyNoInteractions(consumer);
     }
 
     @Test
     public void shouldConvertActiveTaskToStandbyTask() {
-        final StreamTask activeTask = Mockito.mock(StreamTask.class);
+        final StreamTask activeTask = mock(StreamTask.class);
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
         when(activeTask.isActive()).thenReturn(true);
 
-        final StandbyTask standbyTask = Mockito.mock(StandbyTask.class);
+        final StandbyTask standbyTask = mock(StandbyTask.class);
         when(standbyTask.id()).thenReturn(taskId00);
 
-        when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
-        when(standbyTaskCreator.createStandbyTaskFromActive(Mockito.any(), Mockito.eq(taskId00Partitions))).thenReturn(standbyTask);
+        when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(activeTask));
+        when(standbyTaskCreator.createStandbyTaskFromActive(any(), eq(taskId00Partitions))).thenReturn(standbyTask);
 
         taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
         taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
 
-        Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-        Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(emptyMap()));
-        Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
-        Mockito.verifyNoInteractions(consumer);
+        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
+        verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -4969,15 +4933,15 @@ public class TaskManagerTest {
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
         when(standbyTaskCreator.createTasks(taskId00Assignment)).thenReturn(singletonList(standbyTask));
-        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), Mockito.eq(taskId00Partitions), any()))
+        when(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), any()))
             .thenReturn(activeTask);
 
         taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
         taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
 
-        Mockito.verify(activeTaskCreator, times(2)).createTasks(any(), Mockito.eq(emptyMap()));
-        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        Mockito.verifyNoInteractions(consumer);
+        verify(activeTaskCreator, times(2)).createTasks(any(), eq(emptyMap()));
+        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+        verifyNoInteractions(consumer);
     }
 
     @Test
@@ -5002,14 +4966,14 @@ public class TaskManagerTest {
         for (int i = 0; i < names.length; ++i) {
             taskFolders.add(new TaskDirectory(testFolder.newFolder(names[i]), null));
         }
-        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
+        when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders);
     }
 
     private void writeCheckpointFile(final TaskId task, final Map<TopicPartition, Long> offsets) throws Exception {
         final File checkpointFile = getCheckpointFile(task);
         Files.createFile(checkpointFile.toPath());
         new OffsetCheckpoint(checkpointFile).write(offsets);
-        expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
+        lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);
         expectDirectoryNotEmpty(task);
     }