You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/11 05:53:49 UTC
kafka git commit: KAFKA-4510: StreamThread must finish rebalance in
state PENDING_SHUTDOWN
Repository: kafka
Updated Branches:
refs/heads/trunk 1d586cb50 -> 6f7ed15da
KAFKA-4510: StreamThread must finish rebalance in state PENDING_SHUTDOWN
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Eno Thereska, Guozhang Wang
Closes #2227 from mjsax/kafka-4510-finish-rebalance-on-shutdown
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f7ed15d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f7ed15d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f7ed15d
Branch: refs/heads/trunk
Commit: 6f7ed15dad8d914ae65a595eef327c0510f11469
Parents: 1d586cb
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sat Dec 10 21:53:46 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Dec 10 21:53:46 2016 -0800
----------------------------------------------------------------------
.../internals/InternalTopicManager.java | 2 +-
.../internals/StreamPartitionAssignor.java | 2 +-
.../processor/internals/StreamThread.java | 21 +++++++++++++++-----
3 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a65a2ae..c586779 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -52,7 +52,7 @@ public class InternalTopicManager {
public static final String RETENTION_MS = "retention.ms";
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
- private final ZkClient zkClient;
+ final ZkClient zkClient;
private final int replicationFactor;
private final long windowChangeLogAdditionalRetention;
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 7e15f70..8a94b7e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -159,7 +159,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private Map<TaskId, Set<TopicPartition>> activeTasks;
- private InternalTopicManager internalTopicManager;
+ InternalTopicManager internalTopicManager;
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible
http://git-wip-us.apache.org/repos/asf/kafka/blob/6f7ed15d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a2cac71..151bfd5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -167,6 +167,13 @@ public class StreamThread extends Thread {
}
}
+ private synchronized void setStateWhenNotInPendingShutdown(final State newState) {
+ if (state == State.PENDING_SHUTDOWN) {
+ return;
+ }
+ setState(newState);
+ }
+
public final PartitionGrouper partitionGrouper;
private final StreamsMetadataState streamsMetadataState;
public final String applicationId;
@@ -212,21 +219,21 @@ public class StreamThread extends Thread {
final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+
try {
if (state == State.PENDING_SHUTDOWN) {
log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.",
StreamThread.this.getName(), assignment);
- return;
}
log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.",
StreamThread.this.getName(), assignment);
- setState(State.ASSIGNING_PARTITIONS);
+ setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
addStreamTasks(assignment);
addStandbyTasks();
lastCleanMs = time.milliseconds(); // start the cleaning cycle
streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
- setState(State.RUNNING);
+ setStateWhenNotInPendingShutdown(State.RUNNING);
} catch (Throwable t) {
rebalanceException = t;
throw t;
@@ -239,11 +246,10 @@ public class StreamThread extends Thread {
if (state == State.PENDING_SHUTDOWN) {
log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.",
StreamThread.this.getName(), assignment);
- return;
}
log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.",
StreamThread.this.getName(), assignment);
- setState(State.PARTITIONS_REVOKED);
+ setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
// suspend active tasks
suspendTasksAndState(true);
@@ -391,6 +397,11 @@ public class StreamThread extends Thread {
log.error("{} Failed to close restore consumer: ", logPrefix, e);
}
+ // TODO remove this
+ // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369)
+ // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again
+ partitionAssignor.internalTopicManager.zkClient.close();
+
// remove all tasks
removeStreamTasks();
removeStandbyTasks();