You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/11/02 17:23:40 UTC

[kafka] branch 2.4 updated: HOTFIX: Remove from restoringByPartition once restored (#7631)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new dc053e1  HOTFIX: Remove from restoringByPartition once restored (#7631)
dc053e1 is described below

commit dc053e15d7162dd73465c971e9483b0b2ff79d1a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat Nov 2 10:22:36 2019 -0700

    HOTFIX: Remove from restoringByPartition once restored (#7631)
    
    Minor follow up to #7608: For some reason the AssignedStreamTasks#updateRestored method only updates the restoring and restoredPartitions data structures, but there is a third map holding restored tasks & partitions: restoringByPartitions
    
    Also improves the TaskManager#closeLostTasks logging, by separating by case and logging the specific failure before throwing.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/AssignedStreamsTasks.java  | 26 ++++++++++----
 .../streams/processor/internals/AssignedTasks.java | 14 +++++---
 .../processor/internals/StoreChangelogReader.java  | 12 +++++--
 .../streams/processor/internals/TaskManager.java   | 41 +++++++++++++++++++---
 4 files changed, 77 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 1400d5a..c9fe9fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -346,6 +346,8 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
                 transitionToRunning(task);
                 it.remove();
+                restoringByPartition.keySet().removeAll(task.partitions());
+                restoringByPartition.keySet().removeAll(task.changelogPartitions());
                 log.debug("Stream task {} completed restoration as all its changelog partitions {} have been applied to restore state",
                     task.id(),
                     task.changelogPartitions());
@@ -361,6 +363,12 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
         }
         if (allTasksRunning()) {
             restoredPartitions.clear();
+
+            if (!restoringByPartition.isEmpty()) {
+                log.error("Finished restoring all tasks but found leftover partitions in restoringByPartition: {}",
+                    restoringByPartition);
+                throw new IllegalStateException("Restoration is complete but not all partitions were cleared.");
+            }
         }
     }
 
@@ -504,12 +512,18 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements Restorin
     }
 
     @Override
-    public boolean isEmpty() {
-        return super.isEmpty()
-            && restoring.isEmpty()
-            && restoringByPartition.isEmpty()
-            && restoredPartitions.isEmpty()
-            && suspended.isEmpty();
+    public boolean isEmpty() throws IllegalStateException {
+        if (restoring.isEmpty() && !restoringByPartition.isEmpty()) {
+            log.error("Assigned stream tasks in an inconsistent state: the set of restoring tasks is empty but the " +
+                      "restoring by partitions map contained {}", restoringByPartition);
+            throw new IllegalStateException("Found inconsistent state: no tasks restoring but nonempty restoringByPartition");
+        } else {
+            return super.isEmpty()
+                       && restoring.isEmpty()
+                       && restoringByPartition.isEmpty()
+                       && restoredPartitions.isEmpty()
+                       && suspended.isEmpty();
+        }
     }
 
     public String toString(final String indent) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6d64d40..cc4c0f9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -182,10 +182,16 @@ abstract class AssignedTasks<T extends Task> {
         created.clear();
     }
 
-    boolean isEmpty() {
-        return runningByPartition.isEmpty()
-                   && running.isEmpty()
-                   && created.isEmpty();
+    boolean isEmpty() throws IllegalStateException {
+        if (running.isEmpty() && !runningByPartition.isEmpty()) {
+            log.error("Assigned stream tasks in an inconsistent state: the set of running tasks is empty but the " +
+                          "running by partitions map contained {}", runningByPartition);
+            throw new IllegalStateException("Found inconsistent state: no tasks running but nonempty runningByPartition");
+        } else {
+            return runningByPartition.isEmpty()
+                       && running.isEmpty()
+                       && created.isEmpty();
+        }
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 4e5ae57..c14f1cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -300,14 +300,22 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public boolean isEmpty() {
-        return partitionInfo.isEmpty()
-            && stateRestorers.isEmpty()
+        return stateRestorers.isEmpty()
             && needsRestoring.isEmpty()
             && restoreToOffsets.isEmpty()
             && needsInitializing.isEmpty()
             && completedRestorers.isEmpty();
     }
 
+    @Override
+    public String toString() {
+        return "RestoreToOffset: " + restoreToOffsets + "\n" +
+               "StateRestorers: " + stateRestorers + "\n" +
+               "NeedsRestoring: " + needsRestoring + "\n" +
+               "NeedsInitializing: " + needsInitializing + "\n" +
+               "CompletedRestorers: " + completedRestorers + "\n";
+    }
+
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
                              final StateRestorer restorer,
                              final Long endOffset) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4d6dd4d..9c3539e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -130,6 +130,7 @@ public class TaskManager {
         }
 
         // Pause all the new partitions until the underlying state store is ready for all the active tasks.
+        log.debug("Pausing all active task partitions until the underlying state stores are ready");
         pausePartitions();
     }
 
@@ -137,6 +138,8 @@ public class TaskManager {
         final Set<TaskId> suspendedTasks = partitionsToTaskSet(assignment);
         suspendedTasks.removeAll(addedActiveTasks.keySet());
 
+        log.debug("Suspended tasks to be resumed: {}", suspendedTasks);
+
         for (final TaskId taskId : suspendedTasks) {
             final Set<TopicPartition> partitions = assignedActiveTasks.get(taskId);
             try {
@@ -160,7 +163,7 @@ public class TaskManager {
     }
 
     private void addNewStandbyTasks(final Map<TaskId, Set<TopicPartition>> newStandbyTasks) {
-        log.trace("New standby tasks to be created: {}", newStandbyTasks);
+        log.debug("New standby tasks to be created: {}", newStandbyTasks);
 
         for (final StandbyTask task : standbyTaskCreator.createTasks(consumer, newStandbyTasks)) {
             standby.addNewTask(task);
@@ -168,7 +171,8 @@ public class TaskManager {
     }
 
     /**
-     * Returns ids of tasks whose states are kept on the local storage.
+     * Returns ids of tasks whose states are kept on the local storage. This includes active, standby, and previously
+     * assigned but not yet cleaned up tasks
      */
     public Set<TaskId> cachedTasksIds() {
         // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
@@ -269,13 +273,42 @@ public class TaskManager {
 
         if (exception != null) {
             throw exception;
-        } else if (!(active.isEmpty() && assignedActiveTasks.isEmpty() && changelogReader.isEmpty())) {
-            throw new IllegalStateException("TaskManager found leftover active task state after closing all zombies");
         }
 
+        verifyActiveTaskStateIsEmpty();
+
         return zombieTasks;
     }
 
+    private void verifyActiveTaskStateIsEmpty() throws RuntimeException {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+
+        // Verify active has no remaining state, and catch if active.isEmpty throws so we can log any non-empty state
+        try {
+            if (!(active.isEmpty())) {
+                log.error("The set of active tasks was non-empty: {}", active);
+                firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover active task state after closing all zombies"));
+            }
+        } catch (final IllegalStateException e) {
+            firstException.compareAndSet(null, e);
+        }
+
+        if (!(assignedActiveTasks.isEmpty())) {
+            log.error("The set assignedActiveTasks was non-empty: {}", assignedActiveTasks);
+            firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover assignedActiveTasks after closing all zombies"));
+        }
+
+        if (!(changelogReader.isEmpty())) {
+            log.error("The changelog-reader's internal state was non-empty: {}", changelogReader);
+            firstException.compareAndSet(null, new IllegalStateException("TaskManager found leftover changelog reader state after closing all zombies"));
+        }
+
+        final RuntimeException fatalException = firstException.get();
+        if (fatalException != null) {
+            throw fatalException;
+        }
+    }
+
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);