You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/10 22:16:55 UTC

kafka git commit: KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception

Repository: kafka
Updated Branches:
  refs/heads/trunk a5a9a901e -> c22c1775a


KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4046 from mjsax/kafka-5541-minor-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c22c1775
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c22c1775
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c22c1775

Branch: refs/heads/trunk
Commit: c22c1775a550dbefe6bd4cdcf8820404351257a8
Parents: a5a9a90
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 10 15:16:53 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 10 15:16:53 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c22c1775/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 680bbd3..7426d6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks {
             } catch (final TaskMigratedException e) {
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
-                firstException.compareAndSet(null, t);
                 log.error("Failed while closing {} {} due to the following error:",
                           task.getClass().getSimpleName(),
                           task.id(),
                           t);
-                firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
+                if (clean) {
+                    if (!closeUnclean(task)) {
+                        firstException.compareAndSet(null, t);
+                    }
+                } else {
+                    firstException.compareAndSet(null, t);
+                }
             }
         }
 
@@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    private RuntimeException closeUncleanIfRequired(final Task task,
-                                                    final boolean triedToCloseCleanlyPreviously) {
-        if (triedToCloseCleanlyPreviously) {
-            log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
-            try {
-                task.close(false, false);
-            } catch (final RuntimeException fatalException) {
-                log.error("Failed while closing {} {} due to the following error:",
-                    task.getClass().getSimpleName(),
-                    task.id(),
-                    fatalException);
-                return fatalException;
-            }
+    private boolean closeUnclean(final Task task) {
+        log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
+        try {
+            task.close(false, false);
+        } catch (final RuntimeException fatalException) {
+            log.error("Failed while closing {} {} due to the following error:",
+                task.getClass().getSimpleName(),
+                task.id(),
+                fatalException);
+            return false;
         }
 
-        return null;
+        return true;
     }
 }