You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/28 17:20:26 UTC
[kafka] branch 2.0 updated: MINOR: KAFKA-7112: Only resume
restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 495e385 MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
495e385 is described below
commit 495e3857f77236555ae897da1dd0a63f14861bd3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 28 10:19:42 2018 -0700
MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
Before KIP-266, consumer.poll(0) would call updateAssignmentMetadataIfNeeded(Long.MAX_VALUE), which makes sure that the rebalance is definitely completed, i.e. both onPartitionRevoked and onPartitionAssigned called within this poll(0). After KIP-266, however, it is possible that only onPartitionRevoked will be called if timeout is elapsed. And hence we need to double check that state is still PARTITIONS_ASSIGNED after the consumer.poll(duration) call.
Reviewers: Ted Yu <yu...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b..77538ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -804,21 +804,26 @@ public class StreamThread extends Thread {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(Duration.ZERO);
+ } else if (state == State.PARTITIONS_REVOKED) {
+ // try to fetch some records with normal poll time
+ // in order to wait long enough to get the join response
+ records = pollRequests(pollTime);
+ } else if (state == State.RUNNING) {
+ // try to fetch some records with normal poll time
+ // in order to get long polling
+ records = pollRequests(pollTime);
+ } else {
+ // any other state should not happen
+ log.error("Unexpected state {} during normal iteration", state);
+ throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
+ }
+ // only try to initialize the assigned tasks
+ // if the state is still in PARTITION_ASSIGNED after the poll call
+ if (state == State.PARTITIONS_ASSIGNED) {
if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
- } else {
- // try to fetch some records if necessary
- records = pollRequests(pollTime);
-
- // if state changed after the poll call,
- // try to initialize the assigned tasks again
- if (state == State.PARTITIONS_ASSIGNED) {
- if (taskManager.updateNewAndRestoringTasks()) {
- setState(State.RUNNING);
- }
- }
}
if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) {