You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/26 08:12:43 UTC
[kafka] branch trunk updated: MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f191e4781e MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)
f191e4781e is described below
commit f191e4781ee16d131a58e4bccf5eb1fbd476ada6
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Tue Jul 26 10:12:20 2022 +0200
MINOR: Use builder for mock task in DefaultStateUpdaterTest (#12436)
Reviewer: Guozhang Wang <wa...@gmail.com>
---
.../internals/DefaultStateUpdaterTest.java | 290 +++++++++------------
.../org/apache/kafka/test/StreamsTestUtils.java | 5 +
2 files changed, 125 insertions(+), 170 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 087559742b..adc417ede6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -45,6 +45,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.easymock.EasyMock.anyBoolean;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -123,12 +126,12 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStatelessTaskNotInStateRestoring() {
- shouldThrowIfActiveTaskNotInStateRestoring(createStatelessTask(TASK_0_0));
+ shouldThrowIfActiveTaskNotInStateRestoring(statelessTask(TASK_0_0).build());
}
@Test
public void shouldThrowIfStatefulTaskNotInStateRestoring() {
- shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)));
+ shouldThrowIfActiveTaskNotInStateRestoring(statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build());
}
private void shouldThrowIfActiveTaskNotInStateRestoring(final StreamTask task) {
@@ -137,7 +140,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfStandbyTaskNotInStateRunning() {
- final StandbyTask task = createStandbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).build();
shouldThrowIfTaskNotInGivenState(task, State.RUNNING);
}
@@ -152,29 +155,29 @@ class DefaultStateUpdaterTest {
@Test
public void shouldThrowIfAddingActiveTasksWithSameId() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyTasksWithSameId() throws Exception {
- final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+ final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingActiveAndStandbyTaskWithSameId() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task1, task2);
}
@Test
public void shouldThrowIfAddingStandbyAndActiveTaskWithSameId() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
shouldThrowIfAddingTasksWithSameId(task2, task1);
}
@@ -188,15 +191,15 @@ class DefaultStateUpdaterTest {
@Test
public void shouldImmediatelyAddSingleStatelessTaskToRestoredTasks() throws Exception {
- final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
+ final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1);
}
@Test
public void shouldImmediatelyAddMultipleStatelessTasksToRestoredTasks() throws Exception {
- final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
- final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_0_2);
- final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
+ final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
+ final StreamTask task2 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
+ final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
shouldImmediatelyAddStatelessTasksToRestoredTasks(task1, task2, task3);
}
@@ -217,7 +220,7 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreSingleActiveStatefulTask() throws Exception {
final StreamTask task =
- createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
+ statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -243,9 +246,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreMultipleActiveStatefulTasks() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_C_0))
@@ -277,15 +280,15 @@ class DefaultStateUpdaterTest {
public void shouldDrainRestoredActiveTasks() throws Exception {
assertTrue(stateUpdater.drainRestoredActiveTasks(Duration.ZERO).isEmpty());
- final StreamTask task1 = createStatelessTaskInStateRestoring(TASK_0_0);
+ final StreamTask task1 = statelessTask(TASK_0_0).inState(State.RESTORING).build();
stateUpdater.start();
stateUpdater.add(task1);
verifyDrainingRestoredActiveTasks(task1);
- final StreamTask task2 = createStatelessTaskInStateRestoring(TASK_1_1);
- final StreamTask task3 = createStatelessTaskInStateRestoring(TASK_1_0);
- final StreamTask task4 = createStatelessTaskInStateRestoring(TASK_0_2);
+ final StreamTask task2 = statelessTask(TASK_1_1).inState(State.RESTORING).build();
+ final StreamTask task3 = statelessTask(TASK_1_0).inState(State.RESTORING).build();
+ final StreamTask task4 = statelessTask(TASK_0_2).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.add(task3);
stateUpdater.add(task4);
@@ -295,18 +298,16 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateSingleStandbyTask() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(
- TASK_0_0,
- mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0)
- );
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0))
+ .inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task);
}
@Test
public void shouldUpdateMultipleStandbyTasks() throws Exception {
- final StandbyTask task1 = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StandbyTask task1 = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+ final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
shouldUpdateStandbyTasks(task1, task2, task3);
}
@@ -331,10 +332,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
- final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -361,9 +362,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreActiveStatefulTask() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
- final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StreamTask task3 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs())
.thenReturn(Collections.emptySet())
.thenReturn(mkSet(TOPIC_PARTITION_A_0))
@@ -391,9 +392,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() throws Exception {
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(activeTask1.id(), activeTask2.id()));
final Map<TaskId, Task> updatingTasks1 = mkMap(
@@ -419,9 +420,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() throws Exception {
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask activeTask2 = statefulTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -441,13 +442,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemoveActiveStatefulTask() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldRemoveStatefulTask(task);
}
@Test
public void shouldRemoveStandbyTask() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldRemoveStatefulTask(task);
}
@@ -470,8 +471,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldRemovePausedTask() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
@@ -496,18 +497,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
@Test
public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
- final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
+ final StreamTask task = statelessTask(TASK_0_0).inState(State.RESTORING).build();
shouldNotRemoveTaskFromRestoredActiveTasks(task);
}
private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -527,18 +528,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotRemoveActiveStatefulTaskFromFailedTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
@Test
public void shouldNotRemoveStandbyTaskFromFailedTasks() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotRemoveTaskFromFailedTasks(task);
}
private void shouldNotRemoveTaskFromFailedTasks(final Task task) throws Exception {
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -568,22 +569,22 @@ class DefaultStateUpdaterTest {
@Test
public void shouldPauseActiveStatefulTask() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, never()).transitToUpdateStandby();
}
@Test
public void shouldPauseStandbyTask() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldPauseStatefulTask(task);
verify(changelogReader, times(1)).transitToUpdateStandby();
}
@Test
public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(task1);
@@ -629,8 +630,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRestoredActiveTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -649,18 +650,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInFailedTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInFailedTasks() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInFailedTasks(task);
}
private void shouldNotPauseTaskInFailedTasks(final Task task) throws Exception {
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -689,13 +690,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotPauseActiveStatefulTaskInRemovedTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotPauseStandbyTaskInRemovedTasks() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@@ -724,14 +725,14 @@ class DefaultStateUpdaterTest {
@Test
public void shouldResumeActiveStatefulTask() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).enforceRestoreActive();
}
@Test
public void shouldResumeStandbyTask() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldResumeStatefulTask(task);
verify(changelogReader, times(2)).transitToUpdateStandby();
}
@@ -765,8 +766,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRestoredActiveTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -786,13 +787,13 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInRemovedTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInRemovedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInRemovedTasks() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInRemovedTasks(task);
}
@@ -817,18 +818,18 @@ class DefaultStateUpdaterTest {
@Test
public void shouldNotResumeActiveStatefulTaskInFailedTasks() throws Exception {
- final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
shouldNotPauseTaskInFailedTasks(task);
}
@Test
public void shouldNotResumeStandbyTaskInFailedTasks() throws Exception {
- final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ final StandbyTask task = standbyTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
shouldNotResumeTaskInFailedTasks(task);
}
private void shouldNotResumeTaskInFailedTasks(final Task task) throws Exception {
- final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask controlTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
final StreamsException streamsException = new StreamsException("Something happened", task.id());
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
@@ -861,15 +862,15 @@ class DefaultStateUpdaterTest {
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
stateUpdater.add(task1);
stateUpdater.remove(task1.id());
verifyDrainingRemovedTasks(task1);
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
- final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
+ final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
+ final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
stateUpdater.add(task2);
stateUpdater.remove(task2.id());
stateUpdater.add(task3);
@@ -882,8 +883,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithoutTask() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException = new StreamsException(exceptionMessage);
final Map<TaskId, Task> updatingTasks = mkMap(
@@ -906,9 +907,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final StreamsException streamsException2 = new StreamsException(exceptionMessage, task3.id());
@@ -944,9 +945,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
+ final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
final Set<TaskId> expectedTaskIds = mkSet(task1.id(), task2.id());
final TaskCorruptedException taskCorruptedException = new TaskCorruptedException(expectedTaskIds);
final Map<TaskId, Task> updatingTasks = mkMap(
@@ -970,8 +971,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask task2 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final IllegalStateException illegalStateException = new IllegalStateException("Nobody expects the Spanish inquisition!");
final Map<TaskId, Task> updatingTasks = mkMap(
mkEntry(task1.id(), task1),
@@ -995,10 +996,10 @@ class DefaultStateUpdaterTest {
public void shouldDrainFailedTasksAndExceptions() throws Exception {
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_B_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_1_1, mkSet(TOPIC_PARTITION_C_0));
- final StreamTask task3 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task4 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_D_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_1_1, mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
+ final StreamTask task3 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task4 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_D_0)).inState(State.RESTORING).build();
final String exceptionMessage = "The Streams were crossed!";
final StreamsException streamsException1 = new StreamsException(exceptionMessage, task1.id());
final Map<TaskId, Task> updatingTasks1 = mkMap(
@@ -1043,10 +1044,10 @@ class DefaultStateUpdaterTest {
@Test
public void shouldAutoCheckpointTasksOnInterval() throws Exception {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
- final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -1069,10 +1070,10 @@ class DefaultStateUpdaterTest {
final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
try {
- final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, mkSet(TOPIC_PARTITION_C_0));
- final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
+ final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask task3 = standbyTask(TASK_1_0, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StandbyTask task4 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -1105,11 +1106,11 @@ class DefaultStateUpdaterTest {
public void shouldGetTasksFromInputQueue() {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
- final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
- final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+ final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
stateUpdater.add(activeTask1);
stateUpdater.add(standbyTask1);
stateUpdater.add(standbyTask2);
@@ -1135,11 +1136,11 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromUpdatingTasks() throws Exception {
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_2, mkSet(TOPIC_PARTITION_C_0));
- final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
- final StandbyTask standbyTask3 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+ final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask1 = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask3 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -1168,8 +1169,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRestoredActiveTasks() throws Exception {
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StreamTask activeTask2 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
+ final StreamTask activeTask1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StreamTask activeTask2 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
when(changelogReader.completedChangelogs()).thenReturn(mkSet(TOPIC_PARTITION_A_0, TOPIC_PARTITION_B_0));
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -1186,9 +1187,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromExceptionsAndFailedTasks() throws Exception {
- final StreamTask activeTask1 = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
- final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+ final StreamTask activeTask1 = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
final TaskCorruptedException taskCorruptedException =
new TaskCorruptedException(mkSet(standbyTask1.id(), standbyTask2.id()));
final StreamsException streamsException = new StreamsException("The Streams were crossed!", activeTask1.id());
@@ -1220,9 +1221,9 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromRemovedTasks() throws Exception {
- final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, mkSet(TOPIC_PARTITION_B_0));
- final StandbyTask standbyTask2 = createStandbyTaskInStateRunning(TASK_1_1, mkSet(TOPIC_PARTITION_D_0));
- final StandbyTask standbyTask1 = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_1));
+ final StreamTask activeTask = statefulTask(TASK_1_0, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask2 = standbyTask(TASK_1_1, mkSet(TOPIC_PARTITION_D_0)).inState(State.RUNNING).build();
+ final StandbyTask standbyTask1 = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
when(changelogReader.allChangelogsCompleted()).thenReturn(false);
stateUpdater.start();
@@ -1243,8 +1244,8 @@ class DefaultStateUpdaterTest {
@Test
public void shouldGetTasksFromPausedTasks() throws Exception {
- final StreamTask activeTask = createActiveStatefulTaskInStateRestoring(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
- final StandbyTask standbyTask = createStandbyTaskInStateRunning(TASK_0_1, mkSet(TOPIC_PARTITION_A_0));
+ final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+ final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
stateUpdater.start();
stateUpdater.add(activeTask);
stateUpdater.add(standbyTask);
@@ -1453,55 +1454,4 @@ class DefaultStateUpdaterTest {
);
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
}
-
- private StreamTask createActiveStatefulTaskInStateRestoring(final TaskId taskId,
- final Set<TopicPartition> changelogPartitions) {
- final StreamTask task = createActiveStatefulTask(taskId, changelogPartitions);
- when(task.state()).thenReturn(State.RESTORING);
- return task;
- }
-
- private StreamTask createActiveStatefulTask(final TaskId taskId,
- final Set<TopicPartition> changelogPartitions) {
- final StreamTask task = mock(StreamTask.class);
- setupStatefulTask(task, taskId, changelogPartitions);
- when(task.isActive()).thenReturn(true);
- return task;
- }
-
- private StreamTask createStatelessTaskInStateRestoring(final TaskId taskId) {
- final StreamTask task = createStatelessTask(taskId);
- when(task.state()).thenReturn(State.RESTORING);
- return task;
- }
-
- private StreamTask createStatelessTask(final TaskId taskId) {
- final StreamTask task = mock(StreamTask.class);
- when(task.changelogPartitions()).thenReturn(Collections.emptySet());
- when(task.isActive()).thenReturn(true);
- when(task.id()).thenReturn(taskId);
- return task;
- }
-
- private StandbyTask createStandbyTaskInStateRunning(final TaskId taskId,
- final Set<TopicPartition> changelogPartitions) {
- final StandbyTask task = createStandbyTask(taskId, changelogPartitions);
- when(task.state()).thenReturn(State.RUNNING);
- return task;
- }
-
- private StandbyTask createStandbyTask(final TaskId taskId,
- final Set<TopicPartition> changelogPartitions) {
- final StandbyTask task = mock(StandbyTask.class);
- setupStatefulTask(task, taskId, changelogPartitions);
- when(task.isActive()).thenReturn(false);
- return task;
- }
-
- private void setupStatefulTask(final Task task,
- final TaskId taskId,
- final Set<TopicPartition> changelogPartitions) {
- when(task.changelogPartitions()).thenReturn(changelogPartitions);
- when(task.id()).thenReturn(taskId);
- }
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index c88515e01d..77e3bec562 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -343,6 +343,11 @@ public final class StreamsTestUtils {
when(task.id()).thenReturn(taskId);
}
+ public TaskBuilder<T> inState(final Task.State state) {
+ when(task.state()).thenReturn(state);
+ return this;
+ }
+
public T build() {
return task;
}