You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/08/16 02:33:56 UTC
[kafka] branch trunk updated: KAFKA-10199: Handle task closure and recycling from state updater (#12466)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc72f6ec02 KAFKA-10199: Handle task closure and recycling from state updater (#12466)
dc72f6ec02 is described below
commit dc72f6ec02c7a7fbda083cd8a5a9f0081c7e58fd
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Aug 15 19:33:46 2022 -0700
KAFKA-10199: Handle task closure and recycling from state updater (#12466)
1. Within the tryCompleteRestore function of the thread, try to drain the removed tasks from state updater and handle accordingly: 1) for recycle, 2) for closure, 3) for update input partitions.
2. Catch up on some unit test coverage from previous PRs.
3. Some minor cleanups around exception handling.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../processor/internals/ActiveTaskCreator.java | 1 +
.../streams/processor/internals/StandbyTask.java | 2 +-
.../processor/internals/StandbyTaskCreator.java | 1 +
.../streams/processor/internals/StreamTask.java | 2 +-
.../streams/processor/internals/TaskManager.java | 311 +++++++++++++--------
.../kafka/streams/processor/internals/Tasks.java | 90 +++---
.../internals/ProcessorTopologyFactories.java | 1 -
.../processor/internals/StandbyTaskTest.java | 5 +-
.../processor/internals/StreamTaskTest.java | 12 +-
.../processor/internals/TaskManagerTest.java | 178 +++++++++++-
.../streams/processor/internals/TasksTest.java | 64 ++++-
11 files changed, 471 insertions(+), 196 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 46455111db..d28d0d4444 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -135,6 +135,7 @@ class ActiveTaskCreator {
return threadProducer;
}
+ // TODO: convert to StreamTask when we remove TaskManager#StateMachineTask with mocks
public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<Task> createdTasks = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index bb7aef1dcd..87f19c4b1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -234,7 +234,7 @@ public class StandbyTask extends AbstractTask implements Task {
closeTaskSensor.record();
transitionTo(State.CLOSED);
- log.info("Closed and recycled state, and converted type to active");
+ log.info("Closed and recycled state");
}
private void close(final boolean clean) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 2f48cdb67f..26a3a49af3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -67,6 +67,7 @@ class StandbyTaskCreator {
);
}
+ // TODO: convert to StandbyTask when we remove TaskManager#StateMachineTask with mocks
Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<Task> createdTasks = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f7bf8a5e74..8a30bcf6ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -568,7 +568,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
closeTaskSensor.record();
transitionTo(State.CLOSED);
- log.info("Closed and recycled state, and converted type to standby");
+ log.info("Closed and recycled state");
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index cfd20d2299..03c36b0daf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -298,14 +298,16 @@ public class TaskManager {
logPrefix
);
- final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
final Set<Task> tasksToCloseClean = new TreeSet<>(Comparator.comparing(Task::id));
- tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
-
+ // first put aside those unrecognized tasks because of unknown named-topologies
+ tasks.clearPendingTasksToCreate();
+ tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate));
+ tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate));
+
// first rectify all existing tasks:
// 1. for tasks that are already owned, just update input partitions / resume and skip re-creating them
// 2. for tasks that have changed active/standby status, just recycle and skip re-creating them
@@ -316,20 +318,18 @@ public class TaskManager {
classifyTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
}
- tasks.addPendingActiveTasks(pendingTasksToCreate(activeTasksToCreate));
- tasks.addPendingStandbyTasks(pendingTasksToCreate(standbyTasksToCreate));
+ final Map<TaskId, RuntimeException> taskCloseExceptions = closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
- // close and recycle those tasks
- closeAndRecycleTasks(
- tasksToRecycle,
- tasksToCloseClean,
- taskCloseExceptions
- );
+ throwTaskExceptions(taskCloseExceptions);
- if (!taskCloseExceptions.isEmpty()) {
- log.error("Hit exceptions while closing / recycling tasks: {}", taskCloseExceptions);
+ createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+ }
- for (final Map.Entry<TaskId, RuntimeException> entry : taskCloseExceptions.entrySet()) {
+ private void throwTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) {
+ if (!taskExceptions.isEmpty()) {
+ log.error("Get exceptions for the following tasks: {}", taskExceptions);
+
+ for (final Map.Entry<TaskId, RuntimeException> entry : taskExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
final TaskId taskId = entry.getKey();
final RuntimeException exception = entry.getValue();
@@ -340,8 +340,8 @@ public class TaskManager {
throw new StreamsException(exception, taskId);
} else {
throw new StreamsException(
- "Unexpected failure to close " + taskCloseExceptions.size() +
- " task(s) [" + taskCloseExceptions.keySet() + "]. " +
+ "Unexpected failure to close " + taskExceptions.size() +
+ " task(s) [" + taskExceptions.keySet() + "]. " +
"First unexpected exception (for task " + taskId + ") follows.",
exception,
taskId
@@ -352,11 +352,9 @@ public class TaskManager {
// If all exceptions are task-migrated, we would just throw the first one. No need to wrap with a
// StreamsException since TaskMigrated is handled explicitly by the StreamThread
- final Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
+ final Map.Entry<TaskId, RuntimeException> first = taskExceptions.entrySet().iterator().next();
throw first.getValue();
}
-
- createNewTasks(activeTasksToCreate, standbyTasksToCreate);
}
private void createNewTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
@@ -368,8 +366,8 @@ public class TaskManager {
tasks.addNewActiveTasks(newActiveTasks);
tasks.addNewStandbyTasks(newStandbyTask);
} else {
- tasks.addPendingTaskToRestore(newActiveTasks);
- tasks.addPendingTaskToRestore(newStandbyTask);
+ tasks.addPendingTaskToInit(newActiveTasks);
+ tasks.addPendingTaskToInit(newStandbyTask);
}
}
@@ -442,26 +440,27 @@ public class TaskManager {
classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
for (final Task task : stateUpdater.getTasks()) {
final TaskId taskId = task.id();
+ final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
- final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
if (!task.inputPartitions().equals(topicPartitions)) {
- tasks.addPendingTaskThatNeedsInputPartitionsUpdate(taskId);
+ stateUpdater.remove(taskId);
+ tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
}
} else {
stateUpdater.remove(taskId);
- tasks.addPendingStandbyTaskToRecycle(taskId);
+ tasks.addPendingTaskToRecycle(taskId, topicPartitions);
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (!task.isActive()) {
- final Set<TopicPartition> topicPartitions = standbyTasksToCreate.get(taskId);
if (!task.inputPartitions().equals(topicPartitions)) {
- tasks.addPendingTaskThatNeedsInputPartitionsUpdate(taskId);
+ stateUpdater.remove(taskId);
+ tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
}
} else {
stateUpdater.remove(taskId);
- tasks.addPendingActiveTaskToRecycle(taskId);
+ tasks.addPendingTaskToRecycle(taskId, topicPartitions);
}
standbyTasksToCreate.remove(taskId);
} else {
@@ -478,6 +477,8 @@ public class TaskManager {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+ log.info("Cannot create the assigned task {} since it's topology name cannot be recognized, will put it " +
+ "aside as pending for now and create later when topology metadata gets refreshed", taskId);
pendingTasks.put(taskId, entry.getValue());
iter.remove();
}
@@ -485,9 +486,9 @@ public class TaskManager {
return pendingTasks;
}
- private void closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRecycle,
- final Set<Task> tasksToCloseClean,
- final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions) {
+ private Map<TaskId, RuntimeException> closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRecycle,
+ final Set<Task> tasksToCloseClean) {
+ final Map<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
// for all tasks to close or recycle, we should first write a checkpoint as in post-commit
@@ -530,28 +531,32 @@ public class TaskManager {
tasksToCloseClean.removeAll(tasksToCloseDirty);
for (final Task task : tasksToCloseClean) {
try {
- final RuntimeException removeTaskException = completeTaskCloseClean(task);
- if (removeTaskException != null) {
- taskCloseExceptions.putIfAbsent(task.id(), removeTaskException);
- }
+ closeTaskClean(task);
} catch (final RuntimeException closeTaskException) {
final String uncleanMessage = String.format(
- "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
- task.id());
+ "Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
+ task.id());
log.error(uncleanMessage, closeTaskException);
+
+ if (task.state() != State.CLOSED) {
+ tasksToCloseDirty.add(task);
+ }
+
taskCloseExceptions.putIfAbsent(task.id(), closeTaskException);
- tasksToCloseDirty.add(task);
}
}
tasksToRecycle.keySet().removeAll(tasksToCloseDirty);
for (final Map.Entry<Task, Set<TopicPartition>> entry : tasksToRecycle.entrySet()) {
final Task oldTask = entry.getKey();
+ final Set<TopicPartition> inputPartitions = entry.getValue();
try {
if (oldTask.isActive()) {
- convertActiveToStandby((StreamTask) oldTask, entry.getValue());
+ final StandbyTask standbyTask = convertActiveToStandby((StreamTask) oldTask, inputPartitions);
+ tasks.replaceActiveWithStandby(standbyTask);
} else {
- convertStandbyToActive((StandbyTask) oldTask, entry.getValue());
+ final StreamTask activeTask = convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
+ tasks.replaceStandbyWithActive(activeTask);
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
@@ -566,19 +571,18 @@ public class TaskManager {
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
}
+
+ return taskCloseExceptions;
}
- private void convertActiveToStandby(final StreamTask activeTask,
- final Set<TopicPartition> partitions) {
+ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Set<TopicPartition> partitions) {
final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
- tasks.replaceActiveWithStandby(standbyTask);
+ return standbyTask;
}
- private void convertStandbyToActive(final StandbyTask standbyTask,
- final Set<TopicPartition> partitions) {
- final StreamTask activeTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer);
- tasks.replaceStandbyWithActive(activeTask);
+ private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set<TopicPartition> partitions) {
+ return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer);
}
/**
@@ -641,9 +645,9 @@ public class TaskManager {
}
}
} else {
- for (final Task task : tasks.drainPendingTaskToRestore()) {
- stateUpdater.add(task);
- }
+ addTasksToStateUpdater();
+
+ handleRemovedTasksFromStateUpdater();
// TODO: should add logic for checking and resuming when all active tasks have been restored
}
@@ -656,6 +660,72 @@ public class TaskManager {
return allRunning;
}
+ private void addTasksToStateUpdater() {
+ for (final Task task : tasks.drainPendingTaskToInit()) {
+ task.initializeIfNeeded();
+ stateUpdater.add(task);
+ }
+ }
+
+ private void handleRemovedTasksFromStateUpdater() {
+ final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
+ final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
+
+ for (final Task task : stateUpdater.drainRemovedTasks()) {
+ final TaskId taskId = task.id();
+ Set<TopicPartition> inputPartitions;
+ if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
+ try {
+ final Task newTask = task.isActive() ?
+ convertActiveToStandby((StreamTask) task, inputPartitions) :
+ convertStandbyToActive((StandbyTask) task, inputPartitions);
+ newTask.initializeIfNeeded();
+ stateUpdater.add(newTask);
+ } catch (final RuntimeException e) {
+ final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
+ "Attempting to handle remaining tasks before re-throwing:", taskId);
+ log.error(uncleanMessage, e);
+
+ if (task.state() != State.CLOSED) {
+ tasksToCloseDirty.add(task);
+ }
+
+ taskExceptions.putIfAbsent(taskId, e);
+ }
+ } else if (tasks.removePendingTaskToClose(task.id())) {
+ try {
+ task.suspend();
+ task.closeClean();
+ if (task.isActive()) {
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
+ }
+ } catch (final RuntimeException e) {
+ final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
+ "Attempting to handle remaining tasks before re-throwing:", task.id());
+ log.error(uncleanMessage, e);
+
+ if (task.state() != State.CLOSED) {
+ tasksToCloseDirty.add(task);
+ }
+
+ taskExceptions.putIfAbsent(task.id(), e);
+ }
+ } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
+ task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
+ stateUpdater.add(task);
+ } else {
+ throw new IllegalStateException("Got a removed task " + task.id() + " from the state updater " +
+ " that is not for recycle, closing, or updating input partitions; this should not happen");
+ }
+ }
+
+ for (final Task task : tasksToCloseDirty) {
+ closeTaskDirty(task);
+ }
+
+ throwTaskExceptions(taskExceptions);
+ }
+
/**
* Handle the revoked partitions and prepare for closing the associated tasks in {@link #handleAssignment(Map, Map)}
* We should commit the revoking tasks first before suspending them as we will not officially own them anymore when
@@ -735,7 +805,7 @@ public class TaskManager {
task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
- maybeWrapAndSetFirstException(firstException, e, task.id());
+ maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
@@ -750,7 +820,7 @@ public class TaskManager {
task.postCommit(false);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
- maybeWrapAndSetFirstException(firstException, e, task.id());
+ maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
@@ -761,7 +831,7 @@ public class TaskManager {
task.suspend();
} catch (final RuntimeException e) {
log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
- maybeWrapAndSetFirstException(firstException, e, task.id());
+ maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
@@ -961,19 +1031,12 @@ public class TaskManager {
}
}
- private RuntimeException completeTaskCloseClean(final Task task) {
+ private void closeTaskClean(final Task task) {
task.closeClean();
- try {
- tasks.removeTask(task);
-
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
- } catch (final RuntimeException e) {
- log.error("Error removing active task {}: {}", task.id(), e.getMessage());
- return e;
+ tasks.removeTask(task);
+ if (task.isActive()) {
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
- return null;
}
void shutdown(final boolean clean) {
@@ -1010,7 +1073,7 @@ public class TaskManager {
final RuntimeException fatalException = firstException.get();
if (fatalException != null) {
- throw new RuntimeException("Unexpected exception while closing task", fatalException);
+ throw fatalException;
}
}
@@ -1076,59 +1139,51 @@ public class TaskManager {
} else {
try {
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
-
- for (final Task task : activeTaskIterable()) {
- try {
- task.postCommit(true);
- } catch (final RuntimeException e) {
- log.error("Exception caught while post-committing task " + task.id(), e);
- maybeWrapAndSetFirstException(firstException, e, task.id());
- tasksToCloseDirty.add(task);
- tasksToCloseClean.remove(task);
- }
- }
- } catch (final TimeoutException timeoutException) {
- firstException.compareAndSet(null, timeoutException);
-
- tasksToCloseClean.removeAll(tasksToCommit);
- tasksToCloseDirty.addAll(tasksToCommit);
- } catch (final TaskCorruptedException taskCorruptedException) {
- firstException.compareAndSet(null, taskCorruptedException);
-
- final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
- final Set<Task> corruptedTasks = tasksToCommit
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while committing tasks " + consumedOffsetsAndMetadataPerTask.keySet(), e);
+ // TODO: should record the task ids when handling this exception
+ maybeSetFirstException(false, e, firstException);
+
+ if (e instanceof TaskCorruptedException) {
+ final TaskCorruptedException taskCorruptedException = (TaskCorruptedException) e;
+ final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
+ final Set<Task> corruptedTasks = tasksToCommit
.stream()
.filter(task -> corruptedTaskIds.contains(task.id()))
.collect(Collectors.toSet());
+ tasksToCloseClean.removeAll(corruptedTasks);
+ tasksToCloseDirty.addAll(corruptedTasks);
+ } else {
+ // If the commit fails, everyone who participated in it must be closed dirty
+ tasksToCloseClean.removeAll(tasksToCommit);
+ tasksToCloseDirty.addAll(tasksToCommit);
+ }
+ }
- tasksToCloseClean.removeAll(corruptedTasks);
- tasksToCloseDirty.addAll(corruptedTasks);
- } catch (final RuntimeException e) {
- log.error("Exception caught while committing tasks during shutdown", e);
- firstException.compareAndSet(null, e);
-
- // If the commit fails, everyone who participated in it must be closed dirty
- tasksToCloseClean.removeAll(tasksToCommit);
- tasksToCloseDirty.addAll(tasksToCommit);
+ for (final Task task : activeTaskIterable()) {
+ try {
+ task.postCommit(true);
+ } catch (final RuntimeException e) {
+ log.error("Exception caught while post-committing task " + task.id(), e);
+ maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
+ tasksToCloseDirty.add(task);
+ tasksToCloseClean.remove(task);
+ }
}
}
for (final Task task : tasksToCloseClean) {
try {
task.suspend();
- final RuntimeException exception = completeTaskCloseClean(task);
- if (exception != null) {
- firstException.compareAndSet(null, exception);
- }
- } catch (final StreamsException e) {
- log.error("Exception caught while clean-closing task " + task.id(), e);
- e.setTaskId(task.id());
- firstException.compareAndSet(null, e);
- tasksToCloseDirty.add(task);
+ closeTaskClean(task);
} catch (final RuntimeException e) {
- log.error("Exception caught while clean-closing task " + task.id(), e);
- firstException.compareAndSet(null, new StreamsException(e, task.id()));
- tasksToCloseDirty.add(task);
+ log.error("Exception caught while clean-closing active task {}: {}", task.id(), e.getMessage());
+
+ if (task.state() != State.CLOSED) {
+ tasksToCloseDirty.add(task);
+ }
+ // ignore task migrated exception as it doesn't matter during shutdown
+ maybeSetFirstException(true, maybeWrapTaskException(e, task.id()), firstException);
}
}
@@ -1150,16 +1205,15 @@ public class TaskManager {
task.prepareCommit();
task.postCommit(true);
task.suspend();
- final RuntimeException exception = completeTaskCloseClean(task);
- if (exception != null) {
- maybeWrapAndSetFirstException(firstException, exception, task.id());
- }
- } catch (final TaskMigratedException e) {
- // just ignore the exception as it doesn't matter during shutdown
- tasksToCloseDirty.add(task);
+ closeTaskClean(task);
} catch (final RuntimeException e) {
- maybeWrapAndSetFirstException(firstException, e, task.id());
- tasksToCloseDirty.add(task);
+ log.error("Exception caught while clean-closing standby task {}: {}", task.id(), e.getMessage());
+
+ if (task.state() != State.CLOSED) {
+ tasksToCloseDirty.add(task);
+ }
+ // ignore task migrated exception as it doesn't matter during shutdown
+ maybeSetFirstException(true, maybeWrapTaskException(e, task.id()), firstException);
}
}
return tasksToCloseDirty;
@@ -1340,8 +1394,8 @@ public class TaskManager {
}
void createPendingTasks(final Set<String> currentNamedTopologies) {
- final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = tasks.pendingActiveTasksForTopologies(currentNamedTopologies);
- final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = tasks.pendingStandbyTasksForTopologies(currentNamedTopologies);
+ final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = tasks.drainPendingActiveTasksForTopologies(currentNamedTopologies);
+ final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = tasks.drainPendingStandbyTasksForTopologies(currentNamedTopologies);
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
}
@@ -1432,14 +1486,21 @@ public class TaskManager {
return Collections.unmodifiableSet(lockedTaskDirectories);
}
- private void maybeWrapAndSetFirstException(final AtomicReference<RuntimeException> firstException,
- final RuntimeException exception,
- final TaskId taskId) {
- if (exception instanceof StreamsException) {
- ((StreamsException) exception).setTaskId(taskId);
+ private void maybeSetFirstException(final boolean ignoreTaskMigrated,
+ final RuntimeException exception,
+ final AtomicReference<RuntimeException> firstException) {
+ if (!ignoreTaskMigrated || !(exception instanceof TaskMigratedException)) {
firstException.compareAndSet(null, exception);
+ }
+ }
+
+ private StreamsException maybeWrapTaskException(final RuntimeException exception, final TaskId taskId) {
+ if (exception instanceof StreamsException) {
+ final StreamsException streamsException = (StreamsException) exception;
+ streamsException.setTaskId(taskId);
+ return streamsException;
} else {
- firstException.compareAndSet(null, new StreamsException(exception, taskId));
+ return new StreamsException(exception, taskId);
}
}
@@ -1479,4 +1540,8 @@ public class TaskManager {
void addTask(final Task task) {
tasks.addTask(task);
}
+
+ Tasks tasks() {
+ return tasks;
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index e360556658..9628b42d92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -43,83 +43,90 @@ import static org.apache.kafka.common.utils.Utils.union;
class Tasks {
private final Logger log;
- // TODO: change type to `StreamTask`
+ // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
- // TODO: change type to `StandbyTask`
private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
// Tasks may have been assigned for a NamedTopology that is not yet known by this host. When that occurs we stash
// these unknown tasks until either the corresponding NamedTopology is added and we can create them at last, or
// we receive a new assignment and they are revoked from the thread.
-
- // Tasks may have been assigned but not yet created because:
- // 1. They are for a NamedTopology that is yet known by this host.
- // 2. They are to be recycled from an existing restoring task yet to be returned from the state updater.
- //
- // When that occurs we stash these pending tasks until either they are finally clear to be created,
- // or they are revoked from a new assignment.
private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap<>();
private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap<>();
-
- private final Set<Task> pendingTasksToRestore = new HashSet<>();
-
- private final Set<TaskId> pendingActiveTasksToRecycle = new HashSet<>();
- private final Set<TaskId> pendingStandbyTasksToRecycle = new HashSet<>();
- private final Set<TaskId> pendingTasksThatNeedInputPartitionUpdate = new HashSet<>();
+ private final Map<TaskId, Set<TopicPartition>> pendingTasksToRecycle = new HashMap<>();
+ private final Map<TaskId, Set<TopicPartition>> pendingTasksToUpdateInputPartitions = new HashMap<>();
+ private final Set<Task> pendingTasksToInit = new HashSet<>();
private final Set<TaskId> pendingTasksToClose = new HashSet<>();
- // TODO: change type to `StreamTask`
+ // TODO: convert to Stream/StandbyTask when we remove TaskManager#StateMachineTask with mocks
private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<>();
Tasks(final LogContext logContext) {
this.log = logContext.logger(getClass());
}
- void purgePendingTasks(final Set<TaskId> assignedActiveTasks, final Set<TaskId> assignedStandbyTasks) {
- pendingActiveTasksToCreate.keySet().retainAll(assignedActiveTasks);
- pendingStandbyTasksToCreate.keySet().retainAll(assignedStandbyTasks);
+ void clearPendingTasksToCreate() {
+ pendingActiveTasksToCreate.clear();
+ pendingStandbyTasksToCreate.clear();
+ }
+
+ Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+ final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies =
+ filterMap(pendingActiveTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName()));
+
+ pendingActiveTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet());
+
+ return pendingActiveTasksForTopologies;
+ }
+
+ Map<TaskId, Set<TopicPartition>> drainPendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
+ final Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies =
+ filterMap(pendingStandbyTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName()));
+
+ pendingStandbyTasksToCreate.keySet().removeAll(pendingActiveTasksForTopologies.keySet());
+
+ return pendingActiveTasksForTopologies;
}
- void addPendingActiveTasks(final Map<TaskId, Set<TopicPartition>> pendingTasks) {
+ void addPendingActiveTasksToCreate(final Map<TaskId, Set<TopicPartition>> pendingTasks) {
pendingActiveTasksToCreate.putAll(pendingTasks);
}
- void addPendingStandbyTasks(final Map<TaskId, Set<TopicPartition>> pendingTasks) {
+ void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>> pendingTasks) {
pendingStandbyTasksToCreate.putAll(pendingTasks);
}
- void addPendingActiveTaskToRecycle(final TaskId taskId) {
- pendingActiveTasksToRecycle.add(taskId);
+ Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
+ return pendingTasksToRecycle.remove(taskId);
}
- void addPendingStandbyTaskToRecycle(final TaskId taskId) {
- pendingStandbyTasksToRecycle.add(taskId);
+ void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
+ pendingTasksToRecycle.put(taskId, inputPartitions);
}
- void addPendingTaskThatNeedsInputPartitionsUpdate(final TaskId taskId) {
- pendingTasksThatNeedInputPartitionUpdate.add(taskId);
+ Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
+ return pendingTasksToUpdateInputPartitions.remove(taskId);
}
- void addPendingTaskToClose(final TaskId taskId) {
- pendingTasksToClose.add(taskId);
+ void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
+ pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
}
- void addPendingTaskToRestore(final Collection<Task> tasks) {
- pendingTasksToRestore.addAll(tasks);
+ boolean removePendingTaskToClose(final TaskId taskId) {
+ return pendingTasksToClose.remove(taskId);
}
- Set<Task> drainPendingTaskToRestore() {
- final Set<Task> result = new HashSet<>(pendingTasksToRestore);
- pendingTasksToRestore.clear();
- return result;
+ void addPendingTaskToClose(final TaskId taskId) {
+ pendingTasksToClose.add(taskId);
}
- Map<TaskId, Set<TopicPartition>> pendingActiveTasksForTopologies(final Set<String> currentTopologies) {
- return filterMap(pendingActiveTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName()));
+ Set<Task> drainPendingTaskToInit() {
+ final Set<Task> result = new HashSet<>(pendingTasksToInit);
+ pendingTasksToInit.clear();
+ return result;
}
- Map<TaskId, Set<TopicPartition>> pendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
- return filterMap(pendingStandbyTasksToCreate, t -> currentTopologies.contains(t.getKey().topologyName()));
+ void addPendingTaskToInit(final Collection<Task> tasks) {
+ pendingTasksToInit.addAll(tasks);
}
void addNewActiveTasks(final Collection<Task> newTasks) {
@@ -136,7 +143,6 @@ class Tasks {
}
activeTasksPerId.put(activeTask.id(), activeTask);
- pendingActiveTasksToCreate.remove(activeTask.id());
for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, activeTask);
}
@@ -158,7 +164,6 @@ class Tasks {
}
standbyTasksPerId.put(standbyTask.id(), standbyTask);
- pendingStandbyTasksToCreate.remove(standbyTask.id());
}
}
}
@@ -175,12 +180,10 @@ class Tasks {
throw new IllegalArgumentException("Attempted to remove an active task that is not owned: " + taskId);
}
removePartitionsForActiveTask(taskId);
- pendingActiveTasksToCreate.remove(taskId);
} else {
if (standbyTasksPerId.remove(taskId) == null) {
throw new IllegalArgumentException("Attempted to remove a standby task that is not owned: " + taskId);
}
- pendingStandbyTasksToCreate.remove(taskId);
}
}
@@ -269,7 +272,6 @@ class Tasks {
return tasks;
}
- // TODO: change return type to `StreamTask`
Collection<Task> activeTasks() {
return Collections.unmodifiableCollection(activeTasksPerId.values());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
index 57e4490d73..ccc78a4873 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyFactories.java
@@ -25,7 +25,6 @@ import java.util.Map;
public final class ProcessorTopologyFactories {
private ProcessorTopologyFactories() {}
-
public static ProcessorTopology with(final List<ProcessorNode<?, ?, ?, ?>> processorNodes,
final Map<String, SourceNode<?, ?>> sourcesByTopic,
final List<StateStore> stateStoresByName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 02d742d8ab..ba484d210c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -69,6 +69,7 @@ import static org.apache.kafka.streams.processor.internals.Task.State.SUSPENDED;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -565,9 +566,10 @@ public class StandbyTaskTest {
}
@Test
- public void shouldRecycleTask() {
+ public void shouldPrepareRecycleSuspendedTask() {
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
stateManager.recycle();
+ EasyMock.expectLastCall().once();
EasyMock.replay(stateManager);
task = createStandbyTask();
@@ -578,6 +580,7 @@ public class StandbyTaskTest {
task.suspend();
task.prepareRecycle(); // SUSPENDED
+ assertThat(task.state(), is(Task.State.CLOSED));
// Currently, there are no metrics registered for standby tasks.
// This is a regression test so that, if we add some, we will be sure to deregister them.
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 68d2def110..61b8791af7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -2178,8 +2178,10 @@ public class StreamTaskTest {
}
@Test
- public void shouldUnregisterMetricsInCloseCleanAndRecycleState() {
+ public void shouldUnregisterMetricsAndCloseInPrepareRecycle() {
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
+ stateManager.recycle();
+ EasyMock.expectLastCall().once();
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes();
EasyMock.replay(stateManager, recordCollector);
@@ -2189,6 +2191,7 @@ public class StreamTaskTest {
assertThat(getTaskMetrics(), not(empty()));
task.prepareRecycle();
assertThat(getTaskMetrics(), empty());
+ assertThat(task.state(), is(Task.State.CLOSED));
}
@Test
@@ -2270,10 +2273,12 @@ public class StreamTaskTest {
}
@Test
- public void shouldOnlyRecycleSuspendedTasks() {
+ public void shouldPrepareRecycleSuspendedTask() {
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
stateManager.recycle();
+ EasyMock.expectLastCall().once();
recordCollector.closeClean();
- EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.expectLastCall().once();
EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
@@ -2287,6 +2292,7 @@ public class StreamTaskTest {
task.suspend();
task.prepareRecycle(); // SUSPENDED
+ assertThat(task.state(), is(Task.State.CLOSED));
EasyMock.verify(stateManager, recordCollector);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c3233152c0..12ea6477e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -257,23 +257,171 @@ public class TaskManagerTest {
@Test
public void shouldAddTasksToStateUpdater() {
+ final StreamTask task00 = statefulTask(taskId00, taskId00Partitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
+ final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
+ expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).anyTimes();
+ expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+ consumer.resume(anyObject());
+ expectLastCall().anyTimes();
+ expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
+ expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
+ replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+
+ taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
+ taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
+
+ Mockito.verify(task00).initializeIfNeeded();
+ Mockito.verify(task01).initializeIfNeeded();
+ Mockito.verify(stateUpdater).add(task00);
+ Mockito.verify(stateUpdater).add(task01);
+ }
+
+ @Test
+ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() {
+ final StreamTask task00 = statefulTask(taskId00, taskId00Partitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
+ final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
+ final StandbyTask task00Converted = standbyTask(taskId00, taskId00Partitions)
+ .withInputPartitions(taskId00Partitions)
+ .build();
+ final StreamTask task01Converted = statefulTask(taskId01, taskId01Partitions)
+ .withInputPartitions(taskId01Partitions)
+ .build();
+ when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
+
+ taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+ .andStubReturn(task01Converted);
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+ expectLastCall().once();
+ expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+ .andStubReturn(task00Converted);
+ expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+ consumer.resume(anyObject());
+ expectLastCall().anyTimes();
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+
+ taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
+ taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
+ taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
+
+ Mockito.verify(task00Converted).initializeIfNeeded();
+ Mockito.verify(task01Converted).initializeIfNeeded();
+ Mockito.verify(stateUpdater).add(task00Converted);
+ Mockito.verify(stateUpdater).add(task01Converted);
+ }
+
+ @Test
+ public void shouldHandleRemovedTasksToCloseFromStateUpdater() {
+ final StreamTask task00 = statefulTask(taskId00, taskId00Partitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
+ final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
+ when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
+
+ taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+ expectLastCall().once();
+ expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+ consumer.resume(anyObject());
+ expectLastCall().anyTimes();
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+
+ taskManager.tasks().addPendingTaskToClose(taskId00);
+ taskManager.tasks().addPendingTaskToClose(taskId01);
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
+
+ Mockito.verify(task00).suspend();
+ Mockito.verify(task00).closeClean();
+ Mockito.verify(task01).suspend();
+ Mockito.verify(task01).closeClean();
+ }
+
+ @Test
+ public void shouldHandleRemovedTasksToUpdateInputPartitionsFromStateUpdater() {
+ final StreamTask task00 = statefulTask(taskId00, taskId00Partitions)
+ .withInputPartitions(taskId00Partitions)
+ .inState(State.RESTORING)
+ .build();
+ final StandbyTask task01 = standbyTask(taskId01, taskId01Partitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING)
+ .build();
+ when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01));
+
+ taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+ expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
+ consumer.resume(anyObject());
+ expectLastCall().anyTimes();
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
+
+ taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId00, taskId02Partitions);
+ taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId01, taskId03Partitions);
+
+ taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
+
+ Mockito.verify(task00).updateInputPartitions(taskId02Partitions, emptyMap());
+ Mockito.verify(stateUpdater).add(task00);
+ Mockito.verify(task01).updateInputPartitions(taskId03Partitions, emptyMap());
+ Mockito.verify(stateUpdater).add(task01);
+ }
+
+ @Test
+ public void shouldHandleRemovedTasksFromStateUpdater() {
+ // tasks to recycle
final StreamTask task00 = mock(StreamTask.class);
final StandbyTask task01 = mock(StandbyTask.class);
+ final StandbyTask task00Converted = mock(StandbyTask.class);
+ final StreamTask task01Converted = mock(StreamTask.class);
+ // task to close
+ final StreamTask task02 = mock(StreamTask.class);
+ // task to update inputs
+ final StreamTask task03 = mock(StreamTask.class);
when(task00.id()).thenReturn(taskId00);
when(task01.id()).thenReturn(taskId01);
+ when(task02.id()).thenReturn(taskId02);
+ when(task03.id()).thenReturn(taskId03);
when(task00.inputPartitions()).thenReturn(taskId00Partitions);
when(task01.inputPartitions()).thenReturn(taskId01Partitions);
+ when(task02.inputPartitions()).thenReturn(taskId02Partitions);
+ when(task03.inputPartitions()).thenReturn(taskId03Partitions);
when(task00.isActive()).thenReturn(true);
when(task01.isActive()).thenReturn(false);
+ when(task02.isActive()).thenReturn(true);
+ when(task03.isActive()).thenReturn(true);
when(task00.state()).thenReturn(State.RESTORING);
when(task01.state()).thenReturn(State.RUNNING);
- expect(changeLogReader.completedChangelogs()).andReturn(emptySet()).anyTimes();
+ when(task02.state()).thenReturn(State.RESTORING);
+ when(task03.state()).thenReturn(State.RESTORING);
+ when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01, task02, task03));
+
+ expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer)))
+ .andStubReturn(task01Converted);
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+ expectLastCall().times(2);
+ expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions)))
+ .andStubReturn(task00Converted);
expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
consumer.resume(anyObject());
expectLastCall().anyTimes();
- expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
- expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
- replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
+ replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
taskManager = new TaskManager(
time,
@@ -288,12 +436,20 @@ public class TaskManagerTest {
stateUpdater
);
taskManager.setMainConsumer(consumer);
- taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
+ taskManager.tasks().addPendingTaskToClose(taskId02);
+ taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions);
+ taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions);
+ taskManager.tasks().addPendingTaskToUpdateInputPartitions(taskId03, taskId03Partitions);
taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { });
- Mockito.verify(stateUpdater).add(task00);
- Mockito.verify(stateUpdater).add(task01);
+ Mockito.verify(task00Converted).initializeIfNeeded();
+ Mockito.verify(task01Converted).initializeIfNeeded();
+ Mockito.verify(stateUpdater).add(task00Converted);
+ Mockito.verify(stateUpdater).add(task01Converted);
+ Mockito.verify(task02).closeClean();
+ Mockito.verify(task03).updateInputPartitions(taskId03Partitions, emptyMap());
+ Mockito.verify(stateUpdater).add(task03);
}
@Test
@@ -1825,9 +1981,7 @@ public class TaskManagerTest {
RuntimeException.class,
() -> taskManager.shutdown(true)
);
- assertThat(exception.getMessage(), equalTo("Unexpected exception while closing task"));
- assertThat(exception.getCause().getMessage(), is("migrated; it means all tasks belonging to this thread should be migrated."));
- assertThat(exception.getCause().getCause().getMessage(), is("cause"));
+ assertThat(exception.getCause().getMessage(), is("oops"));
assertThat(closedDirtyTask01.get(), is(true));
assertThat(closedDirtyTask02.get(), is(true));
@@ -1887,7 +2041,6 @@ public class TaskManagerTest {
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
assertThat(task00.state(), is(Task.State.CLOSED));
- assertThat(exception.getMessage(), is("Unexpected exception while closing task"));
assertThat(exception.getCause().getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
@@ -1938,8 +2091,7 @@ public class TaskManagerTest {
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
assertThat(task00.state(), is(Task.State.CLOSED));
- assertThat(exception.getMessage(), is("Unexpected exception while closing task"));
- assertThat(exception.getCause().getMessage(), is("whatever"));
+ assertThat(exception.getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 756aa53f86..6265fd4ca2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -22,7 +22,10 @@ import org.apache.kafka.streams.processor.TaskId;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.HashSet;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
@@ -34,6 +37,8 @@ public class TasksTest {
private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
private final static TopicPartition TOPIC_PARTITION_A_1 = new TopicPartition("topicA", 1);
+ private final static TopicPartition TOPIC_PARTITION_B_0 = new TopicPartition("topicB", 0);
+ private final static TopicPartition TOPIC_PARTITION_B_1 = new TopicPartition("topicB", 1);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
@@ -41,7 +46,7 @@ public class TasksTest {
private final LogContext logContext = new LogContext();
@Test
- public void shouldCreateTasks() {
+ public void shouldKeepAddedTasks() {
final Tasks tasks = new Tasks(logContext);
final StreamTask statefulTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).build();
final StandbyTask standbyTask = standbyTask(TASK_0_1, mkSet(TOPIC_PARTITION_A_1)).build();
@@ -51,15 +56,56 @@ public class TasksTest {
tasks.addNewStandbyTasks(Collections.singletonList(standbyTask));
assertEquals(statefulTask, tasks.task(statefulTask.id()));
- assertTrue(tasks.activeTasks().contains(statefulTask));
- assertTrue(tasks.allTasks().contains(statefulTask));
- assertTrue(tasks.tasks(mkSet(statefulTask.id())).contains(statefulTask));
assertEquals(statelessTask, tasks.task(statelessTask.id()));
- assertTrue(tasks.activeTasks().contains(statelessTask));
- assertTrue(tasks.allTasks().contains(statelessTask));
- assertTrue(tasks.tasks(mkSet(statelessTask.id())).contains(statelessTask));
assertEquals(standbyTask, tasks.task(standbyTask.id()));
- assertTrue(tasks.allTasks().contains(standbyTask));
- assertTrue(tasks.tasks(mkSet(standbyTask.id())).contains(standbyTask));
+
+ assertEquals(mkSet(statefulTask, statelessTask), new HashSet<>(tasks.activeTasks()));
+ assertEquals(mkSet(statefulTask, statelessTask, standbyTask), tasks.allTasks());
+ assertEquals(mkSet(statefulTask, standbyTask), tasks.tasks(mkSet(statefulTask.id(), standbyTask.id())));
+ assertEquals(mkSet(statefulTask.id(), statelessTask.id(), standbyTask.id()), tasks.allTaskIds());
+ assertEquals(
+ mkMap(
+ mkEntry(statefulTask.id(), statefulTask),
+ mkEntry(statelessTask.id(), statelessTask),
+ mkEntry(standbyTask.id(), standbyTask)
+ ),
+ tasks.allTasksPerId());
+ assertTrue(tasks.owned(statefulTask.id()));
+ assertTrue(tasks.owned(statelessTask.id()));
+ assertTrue(tasks.owned(statefulTask.id()));
+ }
+
+ @Test
+ public void shouldDrainPendingTasksToCreate() {
+ final Tasks tasks = new Tasks(logContext);
+
+ tasks.addPendingActiveTasksToCreate(mkMap(
+ mkEntry(new TaskId(0, 0, "A"), mkSet(TOPIC_PARTITION_A_0)),
+ mkEntry(new TaskId(0, 1, "A"), mkSet(TOPIC_PARTITION_A_1)),
+ mkEntry(new TaskId(0, 0, "B"), mkSet(TOPIC_PARTITION_B_0)),
+ mkEntry(new TaskId(0, 1, "B"), mkSet(TOPIC_PARTITION_B_1))
+ ));
+
+ tasks.addPendingStandbyTasksToCreate(mkMap(
+ mkEntry(new TaskId(0, 0, "A"), mkSet(TOPIC_PARTITION_A_0)),
+ mkEntry(new TaskId(0, 1, "A"), mkSet(TOPIC_PARTITION_A_1)),
+ mkEntry(new TaskId(0, 0, "B"), mkSet(TOPIC_PARTITION_B_0)),
+ mkEntry(new TaskId(0, 1, "B"), mkSet(TOPIC_PARTITION_B_1))
+ ));
+
+ assertEquals(mkMap(
+ mkEntry(new TaskId(0, 0, "A"), mkSet(TOPIC_PARTITION_A_0)),
+ mkEntry(new TaskId(0, 1, "A"), mkSet(TOPIC_PARTITION_A_1))
+ ), tasks.drainPendingActiveTasksForTopologies(mkSet("A")));
+
+ assertEquals(mkMap(
+ mkEntry(new TaskId(0, 0, "A"), mkSet(TOPIC_PARTITION_A_0)),
+ mkEntry(new TaskId(0, 1, "A"), mkSet(TOPIC_PARTITION_A_1))
+ ), tasks.drainPendingStandbyTasksForTopologies(mkSet("A")));
+
+ tasks.clearPendingTasksToCreate();
+
+ assertEquals(Collections.emptyMap(), tasks.drainPendingActiveTasksForTopologies(mkSet("B")));
+ assertEquals(Collections.emptyMap(), tasks.drainPendingStandbyTasksForTopologies(mkSet("B")));
}
}
\ No newline at end of file