You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/07 02:59:08 UTC
kafka git commit: KAFKA-5541: Streams should not re-throw if
suspending/closing tasks fails
Repository: kafka
Updated Branches:
refs/heads/trunk e2e8d4a57 -> a1ea53606
KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4037 from mjsax/kafka-5541-dont-rethrow-on-suspend-or-close-2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1ea5360
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1ea5360
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1ea5360
Branch: refs/heads/trunk
Commit: a1ea5360631f4366ba536fca37d9f906b0e58a5c
Parents: e2e8d4a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 6 19:59:03 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 19:59:03 2017 -0700
----------------------------------------------------------------------
.../processor/internals/AssignedTasks.java | 77 +++++++++++++++-----
.../processor/internals/StreamThread.java | 6 +-
.../processor/internals/TaskManager.java | 13 +++-
3 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6ab807f..680bbd3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -210,7 +210,7 @@ class AssignedTasks implements RestoringTasks {
}
private RuntimeException suspendTasks(final Collection<Task> tasks) {
- RuntimeException exception = null;
+ final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
final Task task = it.next();
try {
@@ -218,30 +218,30 @@ class AssignedTasks implements RestoringTasks {
suspended.put(task.id(), task);
} catch (final TaskMigratedException closeAsZombieAndSwallow) {
// as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on
- closeZombieTask(task);
+ firstException.compareAndSet(null, closeZombieTask(task));
it.remove();
} catch (final RuntimeException e) {
log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
+ firstException.compareAndSet(null, e);
try {
task.close(false, false);
- } catch (final Exception f) {
+ } catch (final RuntimeException f) {
log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
}
- if (exception == null) {
- exception = e;
- }
}
}
- return exception;
+ return firstException.get();
}
- private void closeZombieTask(final Task task) {
+ private RuntimeException closeZombieTask(final Task task) {
log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id());
try {
task.close(false, true);
- } catch (final Exception e) {
+ } catch (final RuntimeException e) {
log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", taskTypeName, task.id(), e.getMessage());
+ return e;
}
+ return null;
}
boolean hasRunningTasks() {
@@ -260,7 +260,10 @@ class AssignedTasks implements RestoringTasks {
try {
task.resume();
} catch (final TaskMigratedException e) {
- closeZombieTask(task);
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
suspended.remove(taskId);
throw e;
}
@@ -402,7 +405,10 @@ class AssignedTasks implements RestoringTasks {
processed++;
}
} catch (final TaskMigratedException e) {
- closeZombieTask(task);
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
it.remove();
throw e;
} catch (final RuntimeException e) {
@@ -429,7 +435,10 @@ class AssignedTasks implements RestoringTasks {
punctuated++;
}
} catch (final TaskMigratedException e) {
- closeZombieTask(task);
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
it.remove();
throw e;
} catch (final KafkaException e) {
@@ -448,7 +457,10 @@ class AssignedTasks implements RestoringTasks {
try {
action.apply(task);
} catch (final TaskMigratedException e) {
- closeZombieTask(task);
+ final RuntimeException fatalException = closeZombieTask(task);
+ if (fatalException != null) {
+ throw fatalException;
+ }
it.remove();
if (firstException == null) {
firstException = e;
@@ -488,20 +500,45 @@ class AssignedTasks implements RestoringTasks {
}
void close(final boolean clean) {
- close(allTasks(), clean);
- clear();
- }
-
- private void close(final Collection<Task> tasks, final boolean clean) {
- for (final Task task : tasks) {
+ final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+ for (final Task task : allTasks()) {
try {
task.close(clean, false);
- } catch (final Throwable t) {
+ } catch (final TaskMigratedException e) {
+ firstException.compareAndSet(null, closeZombieTask(task));
+ } catch (final RuntimeException t) {
+ firstException.compareAndSet(null, t);
log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
+ firstException.compareAndSet(null, closeUncleanIfRequired(task, clean));
}
}
+
+ clear();
+
+ final RuntimeException fatalException = firstException.get();
+ if (fatalException != null) {
+ throw fatalException;
+ }
+ }
+
+ private RuntimeException closeUncleanIfRequired(final Task task,
+ final boolean triedToCloseCleanlyPreviously) {
+ if (triedToCloseCleanlyPreviously) {
+ log.info("Try to close {} {} unclean.", task.getClass().getSimpleName(), task.id());
+ try {
+ task.close(false, false);
+ } catch (final RuntimeException fatalException) {
+ log.error("Failed while closing {} {} due to the following error:",
+ task.getClass().getSimpleName(),
+ task.id(),
+ fatalException);
+ return fatalException;
+ }
+ }
+
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ea7d362..234d254 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1193,7 +1193,11 @@ public class StreamThread extends Thread implements ThreadDataProvider {
log.info("Shutting down");
- taskManager.shutdown(cleanRun);
+ try {
+ taskManager.shutdown(cleanRun);
+ } catch (final Throwable e) {
+ log.error("Failed to close task manager due to the following error:", e);
+ }
try {
consumer.close();
} catch (final Throwable e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a1ea5360/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5387425..8dc477f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -206,10 +206,16 @@ class TaskManager {
}
void shutdown(final boolean clean) {
+ final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
+
log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.previousTaskIds(), standby.previousTaskIds());
- active.close(clean);
+ try {
+ active.close(clean);
+ } catch (final RuntimeException fatalException) {
+ firstException.compareAndSet(null, fatalException);
+ }
standby.close(clean);
try {
threadMetadataProvider.close();
@@ -220,6 +226,11 @@ class TaskManager {
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
taskCreator.close();
standbyTaskCreator.close();
+
+ final RuntimeException fatalException = firstException.get();
+ if (fatalException != null) {
+ throw fatalException;
+ }
}
Set<TaskId> suspendedActiveTaskIds() {