You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/02/20 21:28:33 UTC
[kafka] branch 2.5 updated: KAFKA-9562: part 1: ignore exceptions
while flushing stores in close(dirty) (#8116)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new c3da9ac KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) (#8116)
c3da9ac is described below
commit c3da9ac86a21ff9bae459f8bc5d3d491360bc758
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Feb 20 15:28:11 2020 -0600
KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) (#8116)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../apache/kafka/streams/processor/internals/StreamTask.java | 6 +++++-
.../kafka/streams/processor/internals/StreamThread.java | 11 +++++++----
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ae06bc6..54da00d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -655,7 +655,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
} else {
// In the case of unclean close we still need to make sure all the stores are flushed before closing any
- super.flushState();
+ try {
+ stateMgr.flush();
+ } catch (final ProcessorStateException e) {
+ // ignore any exceptions while flushing (all stores would have had a chance to flush anyway)
+ }
if (eosEnabled) {
maybeAbortTransactionAndCloseRecordCollector(isZombie);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b6b3d83..0268c7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -701,10 +701,13 @@ public class StreamThread extends Thread {
enforceRebalance();
}
} catch (final TaskMigratedException ignoreAndRejoinGroup) {
- log.warn("Detected task {} that got migrated to another thread. " +
- "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
- "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
- ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+ log.warn("Detected task " + ignoreAndRejoinGroup.migratedTask().id() +
+ " that got migrated to another thread. This implies that this thread missed" +
+ " a rebalance and dropped out of the consumer group. Will try to rejoin the" +
+ " consumer group. Below is the detailed description of the task:\n" +
+ ignoreAndRejoinGroup.migratedTask().toString(">"),
+ ignoreAndRejoinGroup
+ );
enforceRebalance();
}