You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/28 17:20:26 UTC

[kafka] branch 2.0 updated: MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 495e385  MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
495e385 is described below

commit 495e3857f77236555ae897da1dd0a63f14861bd3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 28 10:19:42 2018 -0700

    MINOR: KAFKA-7112: Only resume restoration if state is still PARTITIONS_ASSIGNED after poll (#5306)
    
    Before KIP-266, consumer.poll(0) would call updateAssignmentMetadataIfNeeded(Long.MAX_VALUE), which makes sure that the rebalance is definitely completed, i.e. both onPartitionRevoked and onPartitionAssigned called within this poll(0). After KIP-266, however, it is possible that only onPartitionRevoked will be called if timeout is elapsed. And hence we need to double check that state is still PARTITIONS_ASSIGNED after the consumer.poll(duration) call.
    
    Reviewers: Ted Yu <yu...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a159e7b..77538ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -804,21 +804,26 @@ public class StreamThread extends Thread {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
             records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch some records with normal poll time
+            // in order to wait long enough to get the join response
+            records = pollRequests(pollTime);
+        } else if (state == State.RUNNING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state + " during normal iteration");
+        }
 
+        // only try to initialize the assigned tasks
+        // if the state is still in PARTITION_ASSIGNED after the poll call
+        if (state == State.PARTITIONS_ASSIGNED) {
             if (taskManager.updateNewAndRestoringTasks()) {
                 setState(State.RUNNING);
             }
-        } else {
-            // try to fetch some records if necessary
-            records = pollRequests(pollTime);
-
-            // if state changed after the poll call,
-            // try to initialize the assigned tasks again
-            if (state == State.PARTITIONS_ASSIGNED) {
-                if (taskManager.updateNewAndRestoringTasks()) {
-                    setState(State.RUNNING);
-                }
-            }
         }
 
         if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks()) {