You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/11 05:53:49 UTC

kafka git commit: KAFKA-4510: StreamThread must finish rebalance in state PENDING_SHUTDOWN

Repository: kafka
Updated Branches:
  refs/heads/trunk 1d586cb50 -> 6f7ed15da


KAFKA-4510: StreamThread must finish rebalance in state PENDING_SHUTDOWN

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2227 from mjsax/kafka-4510-finish-rebalance-on-shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f7ed15d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f7ed15d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f7ed15d

Branch: refs/heads/trunk
Commit: 6f7ed15dad8d914ae65a595eef327c0510f11469
Parents: 1d586cb
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sat Dec 10 21:53:46 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Dec 10 21:53:46 2016 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         |  2 +-
 .../internals/StreamPartitionAssignor.java      |  2 +-
 .../processor/internals/StreamThread.java       | 21 +++++++++++++++-----
 3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a65a2ae..c586779 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -52,7 +52,7 @@ public class InternalTopicManager {
     public static final String RETENTION_MS = "retention.ms";
     public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 
-    private final ZkClient zkClient;
+    final ZkClient zkClient;
     private final int replicationFactor;
     private final long windowChangeLogAdditionalRetention;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 7e15f70..8a94b7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -159,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<TaskId, Set<TopicPartition>> activeTasks;
 
-    private InternalTopicManager internalTopicManager;
+    InternalTopicManager internalTopicManager;
 
     /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a2cac71..151bfd5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -167,6 +167,13 @@ public class StreamThread extends Thread {
         }
     }
 
+    private synchronized void setStateWhenNotInPendingShutdown(final State newState) {
+        if (state == State.PENDING_SHUTDOWN) {
+            return;
+        }
+        setState(newState);
+    }
+
     public final PartitionGrouper partitionGrouper;
     private final StreamsMetadataState streamsMetadataState;
     public final String applicationId;
@@ -212,21 +219,21 @@ public class StreamThread extends Thread {
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+
             try {
                 if (state == State.PENDING_SHUTDOWN) {
                     log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.",
                         StreamThread.this.getName(), assignment);
-                    return;
                 }
                 log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.",
                     StreamThread.this.getName(), assignment);
 
-                setState(State.ASSIGNING_PARTITIONS);
+                setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                 addStreamTasks(assignment);
                 addStandbyTasks();
                 lastCleanMs = time.milliseconds(); // start the cleaning cycle
                 streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
-                setState(State.RUNNING);
+                setStateWhenNotInPendingShutdown(State.RUNNING);
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -239,11 +246,10 @@ public class StreamThread extends Thread {
                 if (state == State.PENDING_SHUTDOWN) {
                     log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.",
                         StreamThread.this.getName(), assignment);
-                    return;
                 }
                 log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.",
                         StreamThread.this.getName(), assignment);
-                setState(State.PARTITIONS_REVOKED);
+                setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
                 // suspend active tasks
                 suspendTasksAndState(true);
@@ -391,6 +397,11 @@ public class StreamThread extends Thread {
             log.error("{} Failed to close restore consumer: ", logPrefix, e);
         }
 
+        // TODO remove this
+        // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369)
+        // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again
+        partitionAssignor.internalTopicManager.zkClient.close();
+
         // remove all tasks
         removeStreamTasks();
         removeStandbyTasks();