You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/02/20 21:28:33 UTC

[kafka] branch 2.5 updated: KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) (#8116)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c3da9ac  KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) (#8116)
c3da9ac is described below

commit c3da9ac86a21ff9bae459f8bc5d3d491360bc758
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Feb 20 15:28:11 2020 -0600

    KAFKA-9562: part 1: ignore exceptions while flushing stores in close(dirty) (#8116)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/processor/internals/StreamTask.java  |  6 +++++-
 .../kafka/streams/processor/internals/StreamThread.java       | 11 +++++++----
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ae06bc6..54da00d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -655,7 +655,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             }
         } else {
             // In the case of unclean close we still need to make sure all the stores are flushed before closing any
-            super.flushState();
+            try {
+                stateMgr.flush();
+            } catch (final ProcessorStateException e) {
+                // ignore any exceptions while flushing (all stores would have had a chance to flush anyway)
+            }
 
             if (eosEnabled) {
                 maybeAbortTransactionAndCloseRecordCollector(isZombie);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b6b3d83..0268c7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -701,10 +701,13 @@ public class StreamThread extends Thread {
                     enforceRebalance();
                 }
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
-                log.warn("Detected task {} that got migrated to another thread. " +
-                        "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
-                        "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
-                    ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
+                log.warn("Detected task " + ignoreAndRejoinGroup.migratedTask().id() +
+                             " that got migrated to another thread. This implies that this thread missed" +
+                             " a rebalance and dropped out of the consumer group. Will try to rejoin the" +
+                             " consumer group. Below is the detailed description of the task:\n" +
+                             ignoreAndRejoinGroup.migratedTask().toString(">"),
+                         ignoreAndRejoinGroup
+                         );
 
                 enforceRebalance();
             }