You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/26 08:12:43 UTC

[kafka] branch trunk updated: MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f191e4781e MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)
f191e4781e is described below

commit f191e4781ee16d131a58e4bccf5eb1fbd476ada6
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Tue Jul 26 10:12:20 2022 +0200

    MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)
    
    
    Reviewer: Guozhang Wang <wa...@gmail.com>
---
 .../internals/DefaultStateUpdaterTest.java         | 290 +++++++++------------
 .../org/apache/kafka/test/StreamsTestUtils.java    |   5 +
 2 files changed, 125 insertions(+), 170 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 087559742b..adc417ede6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -45,6 +45,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.easymock.EasyMock.anyBoolean;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -123,12 +126,12 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldThrowIfStatelessTaskNotInStateRestoring() {
-        shouldThrowIfActiveTaskNotInStateRestoring(createStatelessTask(TASK_0_0));
+        shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build());
     }
 
     @Test
     public void shouldThrowIfStatefulTaskNotInStateRestoring() {
-        shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)));
+        shouldThrowIfActiveTaskNotInStateRestoring(statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build());
     }
 
     private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) {
@@ -137,7 +140,7 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldThrowIfStandbyTaskNotInStateRunning() {
-        final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).build();
         shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
     }
 
@@ -152,29 +155,29 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldThrowIfAddingTasksWithSameId(task1, task2);
     }
 
     @Test
     public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
-        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+        final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         shouldThrowIfAddingTasksWithSameId(task1, task2);
     }
 
     @Test
     public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         shouldThrowIfAddingTasksWithSameId(task1, task2);
     }
 
     @Test
     public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         shouldThrowIfAddingTasksWithSameId(task2, task1);
     }
 
@@ -188,15 +191,15 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
-        final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
+        final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
         shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
     }
 
     @Test
     public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
-        final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
-        final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_0_2);
-        final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
+        final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
+        final StreamTask task2 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
+        final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
         shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
     }
 
@@ -217,7 +220,7 @@ class DefaultStateUpdaterTest {
     @Test
     public void shouldRestoreSingleActiveStatefulTask() throws Exception {
         final StreamTask task =
-            createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
+            statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs())
             .thenReturn(Collections.emptySet())
             .thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -243,9 +246,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs())
             .thenReturn(Collections.emptySet())
             .thenReturn(mkSet(TOPIC_PARTITION_C_0))
@@ -277,15 +280,15 @@ class DefaultStateUpdaterTest {
     public void shouldDrainRestoredActiveTasks() throws Exception {
         assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
 
-        final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
+        final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
         stateUpdater.start();
         stateUpdater.add(task1);
 
         verifyDrainingRestoredActiveTasks(task1);
 
-        final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_1_1);
-        final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
-        final StreamTask task4 = createStatelessTaskInStateRestoring(TASK_0_2);
+        final StreamTask task2 = statelessTask(TASK_1_1).inState(State.RESTORING).build();
+        final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
+        final StreamTask task4 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
         stateUpdater.add(task2);
         stateUpdater.add(task3);
         stateUpdater.add(task4);
@@ -295,18 +298,16 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldUpdateSingleStandbyTask() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(
-            TASK_0_0,
-            mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)
-        );
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
+            .inState(State.RUNNING).build();
         shouldUpdateStandbyTasks(task);
     }
 
     @Test
     public void shouldUpdateMultipleStandbyTasks() throws Exception {
-        final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+        final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
         shouldUpdateStandbyTasks(task1, task2, task3);
     }
 
@@ -331,10 +332,10 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
-        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
         when(changelogReader.completedChangelogs())
             .thenReturn(Collections.emptySet())
             .thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -361,9 +362,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
-        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        final StreamTask task3 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs())
             .thenReturn(Collections.emptySet())
             .thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -391,9 +392,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
         final TaskCorruptedException taskCorruptedException =
             new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
         final Map<TaskId, Task> updatingTasks1 = mkMap(
@@ -419,9 +420,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -441,13 +442,13 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRemoveActiveStatefulTask() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldRemoveStatefulTask(task);
     }
 
     @Test
     public void shouldRemoveStandbyTask() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldRemoveStatefulTask(task);
     }
 
@@ -470,8 +471,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldRemovePausedTask() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
 
         stateUpdater.start();
         stateUpdater.add(task1);
@@ -496,18 +497,18 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotRemoveTaskFromRestoredActiveTasks(task);
     }
 
     @Test
     public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
-        final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+        final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build();
         shouldNotRemoveTaskFromRestoredActiveTasks(task);
     }
 
     private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -527,18 +528,18 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotRemoveTaskFromFailedTasks(task);
     }
 
     @Test
     public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldNotRemoveTaskFromFailedTasks(task);
     }
 
     private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         final StreamsException streamsException = new StreamsException("Something happened", task.id());
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -568,22 +569,22 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldPauseActiveStatefulTask() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldPauseStatefulTask(task);
         verify(changelogReader, never()).transitToUpdateStandby();
     }
 
     @Test
     public void shouldPauseStandbyTask() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldPauseStatefulTask(task);
         verify(changelogReader, times(1)).transitToUpdateStandby();
     }
 
     @Test
     public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
 
         stateUpdater.start();
         stateUpdater.add(task1);
@@ -629,8 +630,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -649,18 +650,18 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotPauseTaskInFailedTasks(task);
     }
 
     @Test
     public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldNotPauseTaskInFailedTasks(task);
     }
 
     private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         final StreamsException streamsException = new StreamsException("Something happened", task.id());
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -689,13 +690,13 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotPauseTaskInRemovedTasks(task);
     }
 
     @Test
     public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldNotPauseTaskInRemovedTasks(task);
     }
 
@@ -724,14 +725,14 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldResumeActiveStatefulTask() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldResumeStatefulTask(task);
         verify(changelogReader, times(2)).enforceRestoreActive();
     }
 
     @Test
     public void shouldResumeStandbyTask() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldResumeStatefulTask(task);
         verify(changelogReader, times(2)).transitToUpdateStandby();
     }
@@ -765,8 +766,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -786,13 +787,13 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotPauseTaskInRemovedTasks(task);
     }
 
     @Test
     public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldNotResumeTaskInRemovedTasks(task);
     }
 
@@ -817,18 +818,18 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
-        final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         shouldNotPauseTaskInFailedTasks(task);
     }
 
     @Test
     public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
-        final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         shouldNotResumeTaskInFailedTasks(task);
     }
 
     private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
-        final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         final StreamsException streamsException = new StreamsException("Something happened", task.id());
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -861,15 +862,15 @@ class DefaultStateUpdaterTest {
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
 
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         stateUpdater.add(task1);
         stateUpdater.remove(task1.id());
 
         verifyDrainingRemovedTasks(task1);
 
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
-        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
+        final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
+        final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
         stateUpdater.add(task2);
         stateUpdater.remove(task2.id());
         stateUpdater.add(task3);
@@ -882,8 +883,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         final String exceptionMessage = "The Streams were crossed!";
         final StreamsException streamsException = new StreamsException(exceptionMessage);
         final Map<TaskId, Task> updatingTasks = mkMap(
@@ -906,9 +907,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
         final String exceptionMessage = "The Streams were crossed!";
         final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
         final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id());
@@ -944,9 +945,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+        final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
         final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id());
         final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
         final Map<TaskId, Task> updatingTasks = mkMap(
@@ -970,8 +971,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
         final Map<TaskId, Task> updatingTasks = mkMap(
             mkEntry(task1.id(), task1),
@@ -995,10 +996,10 @@ class DefaultStateUpdaterTest {
     public void shouldDrainFailedTasksAndExceptions() throws Exception {
         assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
 
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
-        final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
+        final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
         final String exceptionMessage = "The Streams were crossed!";
         final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
         final Map<TaskId, Task> updatingTasks1 = mkMap(
@@ -1043,10 +1044,10 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldAutoCheckpointTasksOnInterval() throws Exception {
-        final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
-        final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+        final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -1069,10 +1070,10 @@ class DefaultStateUpdaterTest {
         final Time time = new MockTime();
         final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
         try {
-            final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-            final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
-            final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
-            final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+            final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+            final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+            final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+            final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
             when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
             when(changelogReader.allChangelogsCompleted()).thenReturn(false);
             stateUpdater.start();
@@ -1105,11 +1106,11 @@ class DefaultStateUpdaterTest {
     public void shouldGetTasksFromInputQueue() {
         stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
 
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
-        final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
-        final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
         stateUpdater.add(activeTask1);
         stateUpdater.add(standbyTask1);
         stateUpdater.add(standbyTask2);
@@ -1135,11 +1136,11 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldGetTasksFromUpdatingTasks() throws Exception {
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
-        final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
-        final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -1168,8 +1169,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+        final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
         when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -1186,9 +1187,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
-        final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
-        final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+        final StreamTask activeTask1 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
         final TaskCorruptedException taskCorruptedException =
             new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id()));
         final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
@@ -1220,9 +1221,9 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldGetTasksFromRemovedTasks() throws Exception {
-        final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
-        final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
-        final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+        final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+        final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
         when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
         when(changelogReader.allChangelogsCompleted()).thenReturn(false);
         stateUpdater.start();
@@ -1243,8 +1244,8 @@ class DefaultStateUpdaterTest {
 
     @Test
     public void shouldGetTasksFromPausedTasks() throws Exception {
-        final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
-        final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0));
+        final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+        final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
         stateUpdater.start();
         stateUpdater.add(activeTask);
         stateUpdater.add(standbyTask);
@@ -1453,55 +1454,4 @@ class DefaultStateUpdaterTest {
         );
         assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
     }
-
-    private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId,
-                                                                final Set<TopicPartition> changelogPartitions) {
-        final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions);
-        when(task.state()).thenReturn(State.RESTORING);
-        return task;
-    }
-
-    private StreamTask createActiveStatefulTask(final TaskId taskId,
-                                                final Set<TopicPartition> changelogPartitions) {
-        final StreamTask task = mock(StreamTask.class);
-        setupStatefulTask(task, taskId, changelogPartitions);
-        when(task.isActive()).thenReturn(true);
-        return task;
-    }
-
-    private StreamTask createStatelessTaskInStateRestoring(final TaskId taskId) {
-        final StreamTask task = createStatelessTask(taskId);
-        when(task.state()).thenReturn(State.RESTORING);
-        return task;
-    }
-
-    private StreamTask createStatelessTask(final TaskId taskId) {
-        final StreamTask task = mock(StreamTask.class);
-        when(task.changelogPartitions()).thenReturn(Collections.emptySet());
-        when(task.isActive()).thenReturn(true);
-        when(task.id()).thenReturn(taskId);
-        return task;
-    }
-
-    private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId,
-                                                        final Set<TopicPartition> changelogPartitions) {
-        final StandbyTask task = createStandbyTask(taskId, changelogPartitions);
-        when(task.state()).thenReturn(State.RUNNING);
-        return task;
-    }
-
-    private StandbyTask createStandbyTask(final TaskId taskId,
-                                          final Set<TopicPartition> changelogPartitions) {
-        final StandbyTask task = mock(StandbyTask.class);
-        setupStatefulTask(task, taskId, changelogPartitions);
-        when(task.isActive()).thenReturn(false);
-        return task;
-    }
-
-    private void setupStatefulTask(final Task task,
-                                   final TaskId taskId,
-                                   final Set<TopicPartition> changelogPartitions) {
-        when(task.changelogPartitions()).thenReturn(changelogPartitions);
-        when(task.id()).thenReturn(taskId);
-    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index c88515e01d..77e3bec562 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -343,6 +343,11 @@ public final class StreamsTestUtils {
             when(task.id()).thenReturn(taskId);
         }
 
+        public TaskBuilder<T> inState(final Task.State state) {
+            when(task.state()).thenReturn(state);
+            return this;
+        }
+
         public T build() {
             return task;
         }