You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/07/08 19:06:28 UTC

[kafka] branch 2.5 updated: KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 4844604  KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934)
4844604 is described below

commit 48446042ea322c168238398a9fd2ed236a7ac869
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Jul 8 09:51:50 2020 -0700

    KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934)
    
    The intention of using poll(0) is to not block on rebalance but still return some data; however, `updateAssignmentMetadataIfNeeded` have three different logic: 1) discover coordinator if necessary, 2) join-group if necessary, 3) refresh metadata and fetch position if necessary. We only want to make 2) to be non-blocking but not others, since e.g. when the coordinator is down, then heartbeat would expire and cause the consumer to fetch with timeout 0 as well, causing unnecessarily high CPU.
    
    Since splitting this function is a rather big change to make as a last minute blocker fix for 2.6, so I made a smaller change to make updateAssignmentMetadataIfNeeded has an optional boolean flag to indicate if 2) above should wait until either expired or complete, otherwise do not wait on the join-group future and just poll with zero timer.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/consumer/KafkaConsumer.java    | 17 ++++++++---------
 .../clients/consumer/internals/ConsumerCoordinator.java | 13 ++++++++++---
 .../kafka/clients/consumer/KafkaConsumerTest.java       |  9 +++++++--
 3 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 145dd05..438c563 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1228,19 +1228,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
             }
 
-            // poll for new data until the timeout expires
             do {
                 client.maybeTriggerWakeup();
 
                 if (includeMetadataInTimeout) {
-                    // try to update assignment metadata BUT do not need to block on the timer,
-                    // since even if we are 1) in the middle of a rebalance or 2) have partitions
-                    // with unknown starting positions we may still want to return some data
-                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
-                    // to never block on completing the rebalance procedure if there's any
-                    updateAssignmentMetadataIfNeeded(time.timer(0L));
+                    // try to update assignment metadata BUT do not need to block on the timer for join group
+                    updateAssignmentMetadataIfNeeded(timer, false);
                 } else {
-                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
+                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                         log.warn("Still waiting for metadata");
                     }
                 }
@@ -1272,7 +1267,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Visible for testing
      */
     boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
-        if (coordinator != null && !coordinator.poll(timer)) {
+        return updateAssignmentMetadataIfNeeded(timer, true);
+    }
+
+    boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
+        if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
             return false;
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2978611..66e713a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -436,18 +436,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
+    // for testing
+    boolean poll(Timer timer) {
+        return poll(timer, true);
+    }
+
     /**
      * Poll for coordinator events. This ensures that the coordinator is known and that the consumer
      * has joined the group (if it is using group management). This also handles periodic offset commits
      * if they are enabled.
      * <p>
-     * Returns early if the timeout expires
+     * Returns early if the timeout expires or if waiting on rejoin is not required
      *
      * @param timer Timer bounding how long this method can block
+     * @param waitForJoinGroup Boolean flag indicating if we should wait until re-join group completes
      * @throws KafkaException if the rebalance callback throws an exception
      * @return true iff the operation succeeded
      */
-    public boolean poll(Timer timer) {
+    public boolean poll(Timer timer, boolean waitForJoinGroup) {
         maybeUpdateSubscriptionMetadata();
 
         invokeCompletedOffsetCommitCallbacks();
@@ -487,7 +493,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     maybeUpdateSubscriptionMetadata();
                 }
 
-                if (!ensureActiveGroup(timer)) {
+                // if not wait for join group, we would just use a timer of 0
+                if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
                     return false;
                 }
             }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 00d4db4..4644ffd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1785,14 +1785,19 @@ public class KafkaConsumerTest {
         initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
 
         consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
+
         Node node = metadata.fetch().nodes().get(0);
         Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
 
-        // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
-        consumer.poll(Duration.ZERO);
+        // a first poll with zero millisecond would not complete the rebalance
         consumer.poll(Duration.ZERO);
 
         assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
+        assertEquals(Collections.emptySet(), consumer.assignment());
+
+        // a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
+        consumer.poll(Duration.ofMillis(100L));
+
         assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
 
         // prepare a response of the outstanding fetch so that we have data available on the next poll