You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/07 02:59:08 UTC

kafka git commit: KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails

Repository: kafka
Updated Branches:
  refs/heads/trunk e2e8d4a57 -> a1ea53606


KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4037 from mjsax/kafka-5541-dont-rethrow-on-suspend-or-close-2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1ea5360
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1ea5360
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1ea5360

Branch: refs/heads/trunk
Commit: a1ea5360631f4366ba536fca37d9f906b0e58a5c
Parents: e2e8d4a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 6 19:59:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 19:59:03 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      | 77 +++++++++++++++-----
 .../processor/internals/StreamThread.java       |  6 +-
 .../processor/internals/TaskManager.java        | 13 +++-
 3 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6ab807f..680bbd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -210,7 +210,7 @@ class AssignedTasks implements RestoringTasks {
     }
 
     private RuntimeException suspendTasks(final Collection<Task> tasks) {
-        RuntimeException exception = null;
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
             final Task task = it.next();
             try {
@@ -218,30 +218,30 @@ class AssignedTasks implements RestoringTasks {
                 suspended.put(task.id(), task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
-                closeZombieTask(task);
+                firstException.compareAndSet(null, closeZombieTask(task));
                 it.remove();
             } catch (final RuntimeException e) {
                 log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
+                firstException.compareAndSet(null, e);
                 try {
                     task.close(false, false);
-                } catch (final Exception f) {
+                } catch (final RuntimeException f) {
                     log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
                 }
-                if (exception == null) {
-                    exception = e;
-                }
             }
         }
-        return exception;
+        return firstException.get();
     }
 
-    private void closeZombieTask(final Task task) {
+    private RuntimeException closeZombieTask(final Task task) {
         log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
         try {
             task.close(false, true);
-        } catch (final Exception e) {
+        } catch (final RuntimeException e) {
             log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage());
+            return e;
         }
+        return null;
     }
 
     boolean hasRunningTasks() {
@@ -260,7 +260,10 @@ class AssignedTasks implements RestoringTasks {
                 try {
                     task.resume();
                 } catch (final TaskMigratedException e) {
-                    closeZombieTask(task);
+                    final RuntimeException fatalException = closeZombieTask(task);
+                    if (fatalException != null) {
+                        throw fatalException;
+                    }
                     suspended.remove(taskId);
                     throw e;
                 }
@@ -402,7 +405,10 @@ class AssignedTasks implements RestoringTasks {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
-                closeZombieTask(task);
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
                 it.remove();
                 throw e;
             } catch (final RuntimeException e) {
@@ -429,7 +435,10 @@ class AssignedTasks implements RestoringTasks {
                     punctuated++;
                 }
             } catch (final TaskMigratedException e) {
-                closeZombieTask(task);
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
                 it.remove();
                 throw e;
             } catch (final KafkaException e) {
@@ -448,7 +457,10 @@ class AssignedTasks implements RestoringTasks {
             try {
                 action.apply(task);
             } catch (final TaskMigratedException e) {
-                closeZombieTask(task);
+                final RuntimeException fatalException = closeZombieTask(task);
+                if (fatalException != null) {
+                    throw fatalException;
+                }
                 it.remove();
                 if (firstException == null) {
                     firstException = e;
@@ -488,20 +500,45 @@ class AssignedTasks implements RestoringTasks {
     }
 
     void close(final boolean clean) {
-        close(allTasks(), clean);
-        clear();
-    }
-
-    private void close(final Collection<Task> tasks, final boolean clean) {
-        for (final Task task : tasks) {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+        for (final Task task : allTasks()) {
             try {
                 task.close(clean, false);
-            } catch (final Throwable t) {
+            } catch (final TaskMigratedException e) {
+                firstException.compareAndSet(null, closeZombieTask(task));
+            } catch (final RuntimeException t) {
+                firstException.compareAndSet(null, t);
                 log.error("Failed while closing {} {} due to the following error:",
                           task.getClass().getSimpleName(),
                           task.id(),
                           t);
+                firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
             }
         }
+
+        clear();
+
+        final RuntimeException fatalException = firstException.get();
+        if (fatalException != null) {
+            throw fatalException;
+        }
+    }
+
+    private RuntimeException closeUncleanIfRequired(final Task task,
+                                                    final boolean triedToCloseCleanlyPreviously) {
+        if (triedToCloseCleanlyPreviously) {
+            log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
+            try {
+                task.close(false, false);
+            } catch (final RuntimeException fatalException) {
+                log.error("Failed while closing {} {} due to the following error:",
+                    task.getClass().getSimpleName(),
+                    task.id(),
+                    fatalException);
+                return fatalException;
+            }
+        }
+
+        return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ea7d362..234d254 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1193,7 +1193,11 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
         log.info("Shutting down");
 
-        taskManager.shutdown(cleanRun);
+        try {
+            taskManager.shutdown(cleanRun);
+        } catch (final Throwable e) {
+            log.error("Failed to close task manager due to the following error:", e);
+        }
         try {
             consumer.close();
         } catch (final Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5387425..8dc477f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -206,10 +206,16 @@ class TaskManager {
     }
 
     void shutdown(final boolean clean) {
+        final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+
         log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
                   active.previousTaskIds(), standby.previousTaskIds());
 
-        active.close(clean);
+        try {
+            active.close(clean);
+        } catch (final RuntimeException fatalException) {
+            firstException.compareAndSet(null, fatalException);
+        }
         standby.close(clean);
         try {
             threadMetadataProvider.close();
@@ -220,6 +226,11 @@ class TaskManager {
         restoreConsumer.assign(Collections.<TopicPartition>emptyList());
         taskCreator.close();
         standbyTaskCreator.close();
+
+        final RuntimeException fatalException = firstException.get();
+        if (fatalException != null) {
+            throw fatalException;
+        }
     }
 
     Set<TaskId> suspendedActiveTaskIds() {