You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/10 22:16:55 UTC
kafka git commit: KAFKA-5541: Follow-up;
Try to clean uncleanly upon clean close failure before throwing the
exception
Repository: kafka
Updated Branches:
refs/heads/trunk a5a9a901e -> c22c1775a
KAFKA-5541: Follow-up; Try to clean uncleanly upon clean close failure before throwing the exception
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4046 from mjsax/kafka-5541-minor-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c22c1775
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c22c1775
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c22c1775
Branch: refs/heads/trunk
Commit: c22c1775a550dbefe6bd4cdcf8820404351257a8
Parents: a5a9a90
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 10 15:16:53 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 10 15:16:53 2017 -0700
----------------------------------------------------------------------
.../processor/internals/AssignedTasks.java | 34 +++++++++++---------
1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c22c1775/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 680bbd3..7426d6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -507,12 +507,17 @@ class AssignedTasks implements RestoringTasks {
} catch (final TaskMigratedException e) {
firstException.compareAndSet(null, closeZombieTask(task));
} catch (final RuntimeException t) {
- firstException.compareAndSet(null, t);
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
- firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
+ if (clean) {
+ if (!closeUnclean(task)) {
+ firstException.compareAndSet(null, t);
+ }
+ } else {
+ firstException.compareAndSet(null, t);
+ }
}
}
@@ -524,21 +529,18 @@ class AssignedTasks implements RestoringTasks {
}
}
- private RuntimeException closeUncleanIfRequired(final Task task,
- final boolean triedToCloseCleanlyPreviously) {
- if (triedToCloseCleanlyPreviously) {
- log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
- try {
- task.close(false, false);
- } catch (final RuntimeException fatalException) {
- log.error("Failed while closing {} {} due to the following error:",
- task.getClass().getSimpleName(),
- task.id(),
- fatalException);
- return fatalException;
- }
+ private boolean closeUnclean(final Task task) {
+ log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
+ try {
+ task.close(false, false);
+ } catch (final RuntimeException fatalException) {
+ log.error("Failed while closing {} {} due to the following error:",
+ task.getClass().getSimpleName(),
+ task.id(),
+ fatalException);
+ return false;
}
- return null;
+ return true;
}
}