You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/02 17:23:40 UTC
[kafka] branch 2.4 updated: HOTFIX: Remove from
restoringByPartition once restored (#7631)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new dc053e1 HOTFIX: Remove from restoringByPartition once restored (#7631)
dc053e1 is described below
commit dc053e15d7162dd73465c971e9483b0b2ff79d1a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Nov 2 10:22:36 2019 -0700
HOTFIX: Remove from restoringByPartition once restored (#7631)
Minor follow up to #7608: For some reason the AssignedStreamTasks#updateRestored method only updates the restoring and restoredPartitions data structures, but there is a third map holding restored tasks & partitions: restoringByPartitions
Also improves the TaskManager#closeLostTasks logging, by separating by case and logging the specific failure before throwing.
Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/AssignedStreamsTasks.java | 26 ++++++++++----
.../streams/processor/internals/AssignedTasks.java | 14 +++++---
.../processor/internals/StoreChangelogReader.java | 12 +++++--
.../streams/processor/internals/TaskManager.java | 41 +++++++++++++++++++---
4 files changed, 77 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 1400d5a..c9fe9fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -346,6 +346,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task);
it.remove();
+ restoringByPartition.keySet().removeAll(task.partitions());
+ restoringByPartition.keySet().removeAll(task.changelogPartitions());
log.debug("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state",
task.id(),
task.changelogPartitions());
@@ -361,6 +363,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
if (allTasksRunning()) {
restoredPartitions.clear();
+
+ if (!restoringByPartition.isEmpty()) {
+ log.error("Finished restoring all tasks but found leftover partitions in restoringByPartition: {}",
+ restoringByPartition);
+ throw new IllegalStateException("Restoration is complete but not all partitions were cleared.");
+ }
}
}
@@ -504,12 +512,18 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
}
@Override
- public boolean isEmpty() {
- return super.isEmpty()
- && restoring.isEmpty()
- && restoringByPartition.isEmpty()
- && restoredPartitions.isEmpty()
- && suspended.isEmpty();
+ public boolean isEmpty() throws IllegalStateException {
+ if (restoring.isEmpty() && !restoringByPartition.isEmpty()) {
+ log.error("Assigned stream tasks in an inconsistent state: the set of restoring tasks is empty but the " +
+ "restoring by partitions map contained {}", restoringByPartition);
+ throw new IllegalStateException("Found inconsistent state: no tasks restoring but nonempty restoringByPartition");
+ } else {
+ return super.isEmpty()
+ && restoring.isEmpty()
+ && restoringByPartition.isEmpty()
+ && restoredPartitions.isEmpty()
+ && suspended.isEmpty();
+ }
}
public String toString(final String indent) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6d64d40..cc4c0f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -182,10 +182,16 @@ abstract class AssignedTasks<T extends Task> {
created.clear();
}
- boolean isEmpty() {
- return runningByPartition.isEmpty()
- && running.isEmpty()
- && created.isEmpty();
+ boolean isEmpty() throws IllegalStateException {
+ if (running.isEmpty() && !runningByPartition.isEmpty()) {
+ log.error("Assigned stream tasks in an inconsistent state: the set of running tasks is empty but the " +
+ "running by partitions map contained {}", runningByPartition);
+ throw new IllegalStateException("Found inconsistent state: no tasks running but nonempty runningByPartition");
+ } else {
+ return runningByPartition.isEmpty()
+ && running.isEmpty()
+ && created.isEmpty();
+ }
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 4e5ae57..c14f1cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -300,14 +300,22 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public boolean isEmpty() {
- return partitionInfo.isEmpty()
- && stateRestorers.isEmpty()
+ return stateRestorers.isEmpty()
&& needsRestoring.isEmpty()
&& restoreToOffsets.isEmpty()
&& needsInitializing.isEmpty()
&& completedRestorers.isEmpty();
}
+ @Override
+ public String toString() {
+ return "RestoreToOffset: " + restoreToOffsets + "\n" +
+ "StateRestorers: " + stateRestorers + "\n" +
+ "NeedsRestoring: " + needsRestoring + "\n" +
+ "NeedsInitializing: " + needsInitializing + "\n" +
+ "CompletedRestorers: " + completedRestorers + "\n";
+ }
+
private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
final StateRestorer restorer,
final Long endOffset) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4d6dd4d..9c3539e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -130,6 +130,7 @@ public class TaskManager {
}
// Pause all the new partitions until the underlying state store is ready for all the active tasks.
+ log.debug("Pausing all active task partitions until the underlying state stores are ready");
pausePartitions();
}
@@ -137,6 +138,8 @@ public class TaskManager {
final Set<TaskId> suspendedTasks = partitionsToTaskSet(assignment);
suspendedTasks.removeAll(addedActiveTasks.keySet());
+ log.debug("Suspended tasks to be resumed: {}", suspendedTasks);
+
for (final TaskId taskId : suspendedTasks) {
final Set<TopicPartition> partitions = assignedActiveTasks.get(taskId);
try {
@@ -160,7 +163,7 @@ public class TaskManager {
}
private void addNewStandbyTasks(final Map<TaskId, Set<TopicPartition>> newStandbyTasks) {
- log.trace("New standby tasks to be created: {}", newStandbyTasks);
+ log.debug("New standby tasks to be created: {}", newStandbyTasks);
for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
standby.addNewTask(task);
@@ -168,7 +171,8 @@ public class TaskManager {
}
/**
- * Returns ids of tasks whose states are kept on the local storage.
+ * Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously
+ * assigned but not yet cleaned up tasks
*/
public Set<TaskId> cachedTasksIds() {
// A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
@@ -269,13 +273,42 @@ public class TaskManager {
if (exception != null) {
throw exception;
- } else if (!(active.isEmpty() && assignedActiveTasks.isEmpty() && changelogReader.isEmpty())) {
- throw new IllegalStateException("TaskManager found leftover active task state after closing all zombies");
}
+ verifyActiveTaskStateIsEmpty();
+
return zombieTasks;
}
+ private void verifyActiveTaskStateIsEmpty() throws RuntimeException {
+ final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+
+ // Verify active has no remaining state, and catch if active.isEmpty throws so we can log any non-empty state
+ try {
+ if (!(active.isEmpty())) {
+ log.error("The set of active tasks was non-empty: {}", active);
+ firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover active task state after closing all zombies"));
+ }
+ } catch (final IllegalStateException e) {
+ firstException.compareAndSet(null, e);
+ }
+
+ if (!(assignedActiveTasks.isEmpty())) {
+ log.error("The set assignedActiveTasks was non-empty: {}", assignedActiveTasks);
+ firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover assignedActiveTasks after closing all zombies"));
+ }
+
+ if (!(changelogReader.isEmpty())) {
+ log.error("The changelog-reader's internal state was non-empty: {}", changelogReader);
+ firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover changelog reader state after closing all zombies"));
+ }
+
+ final RuntimeException fatalException = firstException.get();
+ if (fatalException != null) {
+ throw fatalException;
+ }
+ }
+
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);