You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/07/08 16:53:10 UTC
[kafka] branch 2.6 updated: KAFKA-10134: Use long poll if we do not
have fetchable partitions (#8934)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new d62c332 KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934)
d62c332 is described below
commit d62c33297d05ae3dd1cac58b700b1015a933ffc1
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Jul 8 09:51:50 2020 -0700
KAFKA-10134: Use long poll if we do not have fetchable partitions (#8934)
The intention of using poll(0) is to not block on rebalance but still return some data; however, `updateAssignmentMetadataIfNeeded` have three different logic: 1) discover coordinator if necessary, 2) join-group if necessary, 3) refresh metadata and fetch position if necessary. We only want to make 2) to be non-blocking but not others, since e.g. when the coordinator is down, then heartbeat would expire and cause the consumer to fetch with timeout 0 as well, causing unnecessarily high CPU.
Since splitting this function is a rather big change to make as a last minute blocker fix for 2.6, so I made a smaller change to make updateAssignmentMetadataIfNeeded has an optional boolean flag to indicate if 2) above should wait until either expired or complete, otherwise do not wait on the join-group future and just poll with zero timer.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../apache/kafka/clients/consumer/KafkaConsumer.java | 17 ++++++++---------
.../clients/consumer/internals/ConsumerCoordinator.java | 13 ++++++++++---
.../kafka/clients/consumer/KafkaConsumerTest.java | 9 +++++++--
3 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 610cfde..33a2fbb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1222,19 +1222,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
- // poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
- // try to update assignment metadata BUT do not need to block on the timer,
- // since even if we are 1) in the middle of a rebalance or 2) have partitions
- // with unknown starting positions we may still want to return some data
- // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
- // to never block on completing the rebalance procedure if there's any
- updateAssignmentMetadataIfNeeded(time.timer(0L));
+ // try to update assignment metadata BUT do not need to block on the timer for join group
+ updateAssignmentMetadataIfNeeded(timer, false);
} else {
- while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
+ while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
log.warn("Still waiting for metadata");
}
}
@@ -1266,7 +1261,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Visible for testing
*/
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
- if (coordinator != null && !coordinator.poll(timer)) {
+ return updateAssignmentMetadataIfNeeded(timer, true);
+ }
+
+ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
+ if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
return false;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 66e702a..9a932f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -452,18 +452,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
+ // for testing
+ boolean poll(Timer timer) {
+ return poll(timer, true);
+ }
+
/**
* Poll for coordinator events. This ensures that the coordinator is known and that the consumer
* has joined the group (if it is using group management). This also handles periodic offset commits
* if they are enabled.
* <p>
- * Returns early if the timeout expires
+ * Returns early if the timeout expires or if waiting on rejoin is not required
*
* @param timer Timer bounding how long this method can block
+ * @param waitForJoinGroup Boolean flag indicating if we should wait until re-join group completes
* @throws KafkaException if the rebalance callback throws an exception
* @return true iff the operation succeeded
*/
- public boolean poll(Timer timer) {
+ public boolean poll(Timer timer, boolean waitForJoinGroup) {
maybeUpdateSubscriptionMetadata();
invokeCompletedOffsetCommitCallbacks();
@@ -503,7 +509,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
maybeUpdateSubscriptionMetadata();
}
- if (!ensureActiveGroup(timer)) {
+ // if not wait for join group, we would just use a timer of 0
+ if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
return false;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e2ec870..0abcdd0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1838,14 +1838,19 @@ public class KafkaConsumerTest {
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
+
Node node = metadata.fetch().nodes().get(0);
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
- // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
- consumer.poll(Duration.ZERO);
+ // a first poll with zero millisecond would not complete the rebalance
consumer.poll(Duration.ZERO);
assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
+ assertEquals(Collections.emptySet(), consumer.assignment());
+
+ // a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
+ consumer.poll(Duration.ofMillis(100L));
+
assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
// prepare a response of the outstanding fetch so that we have data available on the next poll