You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 02:32:52 UTC

kafka git commit: MINOR: Reorder StreamThread shutdown sequence

Repository: kafka
Updated Branches:
  refs/heads/trunk 57da044a9 -> 4adfd7960


MINOR: Reorder StreamThread shutdown sequence

We need to close producer first before closing tasks to make sure all messages are acked and hence checkpoint offsets are updated before closing tasks and their state. It was re-ordered mistakenly before.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #845 from guozhangwang/KStreamState


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4adfd796
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4adfd796
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4adfd796

Branch: refs/heads/trunk
Commit: 4adfd7960c4bef187454d1ff5186f3be690abbf5
Parents: 57da044
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 1 17:32:36 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 1 17:32:36 2016 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManager.java        |  3 ---
 .../processor/internals/StreamThread.java       | 22 ++++++++++++--------
 2 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4adfd796/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index bc7f4b5..b90af48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -335,9 +335,6 @@ public class ProcessorStateManager {
             checkpoint.write(checkpointOffsets);
         }
 
-        // un-assign the change log partition
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-
         // release the state directory directoryLock
         directoryLock.release();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4adfd796/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eccd02c..8948fc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -261,20 +261,16 @@ public class StreamThread extends Thread {
     private void shutdown() {
         log.info("Shutting down stream thread [" + this.getName() + "]");
 
-        // We need to first remove the tasks before shutting down the underlying clients
-        // as they may be required in the previous steps; and exceptions should not
-        // prevent this call from going through all shutdown steps.
         try {
             commitAll();
         } catch (Throwable e) {
             // already logged in commitAll()
         }
-        try {
-            removeStreamTasks();
-            removeStandbyTasks();
-        } catch (Throwable e) {
-            // already logged in removeStreamTasks() and removeStandbyTasks()
-        }
+
+        // We need to first close the underlying clients before closing the state
+        // manager, for example we need to make sure producer's message sends
+        // have all been acked before the state manager records
+        // changelog sent offsets
         try {
             producer.close();
         } catch (Throwable e) {
@@ -291,6 +287,14 @@ public class StreamThread extends Thread {
             log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
         }
 
+        // Exceptions should not prevent this call from going through all shutdown steps
+        try {
+            removeStreamTasks();
+            removeStandbyTasks();
+        } catch (Throwable e) {
+            // already logged in removeStreamTasks() and removeStandbyTasks()
+        }
+
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
     }