You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/06/07 14:27:18 UTC

[kafka] branch 3.1 updated: HOTFIX: only try to clear discover-coordinator future upon commit (#12244) (#12259)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new a53b4102d3 HOTFIX: only try to clear discover-coordinator future upon commit (#12244) (#12259)
a53b4102d3 is described below

commit a53b4102d327a118883b0123ec5ce69e40626483
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Jun 7 07:27:02 2022 -0700

    HOTFIX: only try to clear discover-coordinator future upon commit (#12244) (#12259)
    
    This is a cherrypick commit of 3.1. Another way of fixing KAFKA-13563 other than #11631.
    
    Instead of letting the consumer to always try to discover coordinator in pool with either mode (subscribe / assign), we defer the clearance of discover future upon committing async only. More specifically, under manual assign mode, there are only three places where we need the coordinator:
    
    commitAsync (both by the consumer itself or triggered by caller), this is where we want to fix.
    commitSync, which we already try to re-discovery coordinator.
    committed (both by the consumer itself based on reset policy, or triggered by caller), which we already try to re-discovery coordinator.
    The benefits are that for manual assign mode that does not try to trigger any of the above three, then we never would be discovering coordinator. The original fix in #11631 would let the consumer to discover coordinator even if none of the above operations are required.
    
    Reviewers: Luke Chen <sh...@gmail.com>, David Jacot <dj...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    | 23 +++++++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java         | 57 ++++++++++++++++++++--
 3 files changed, 73 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a7194a0895..588654a833 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -521,14 +522,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 }
             }
         } else {
-            // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata.
+            // For manually assigned partitions, we do not try to pro-actively lookup coordinator;
+            // instead we only try to refresh metadata when necessary.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
             // requests result in polls returning immediately, causing a tight loop of polls. Without
             // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
+                client.awaitMetadataUpdate(timer);
             }
+
+            // if there is pending coordinator requests, ensure they have a chance to be transmitted.
+            client.pollNoWakeup();
         }
 
         maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
@@ -946,7 +951,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
-        if (!coordinatorUnknown()) {
+        if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+            // we need to make sure coordinator is ready before committing, since
+            // this is for async committing we do not try to block, but just try once to
+            // clear the previous discover-coordinator future, resend, or get responses;
+            // if the coordinator is not ready yet then we would just proceed and put that into the
+            // pending requests, and future poll calls would still try to complete them.
+            //
+            // the key here though is that we have to try sending the discover-coordinator if
+            // it's not known or ready, since this is the only place we can send such request
+            // under manual assignment (there we would not have heartbeat thread trying to auto-rediscover
+            // the coordinator).
             doCommitOffsetsAsync(offsets, callback);
         } else {
             // we don't know the current coordinator, so try to find it and then send the commit
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f8b86b83cd..dabef9ad1e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -716,7 +716,6 @@ public class KafkaConsumerTest {
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
 
-        client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
         client.prepareResponse(body -> {
             ListOffsetsRequest request = (ListOffsetsRequest) body;
             List<ListOffsetsPartition> partitions = request.topics().stream().flatMap(t -> {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index b07399558f..dbee99f0ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -486,10 +486,62 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // set timeout to 0 because we expect no requests sent
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertFalse(client.hasInFlightRequests());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertTrue(client.hasInFlightRequests());
+
+            client.respond(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(0));
+            assertFalse(coordinator.coordinatorUnknown());
+            // after we've discovered the coordinator we should send
+            // out the commit request immediately
+            assertTrue(client.hasInFlightRequests());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // set timeout to 0 because we expect no requests sent
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(client.hasInFlightRequests());
+
+        // should try to find coordinator since we are commit async
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), (offsets, exception) -> {
+            fail("Commit should not get responses, but got offsets:" + offsets + ", and exception:" + exception);
+        });
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertTrue(client.hasInFlightRequests());
+
+        client.respond(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0));
         assertFalse(coordinator.coordinatorUnknown());
+        // after we've discovered the coordinator we should send
+        // out the commit request immediately
+        assertTrue(client.hasInFlightRequests());
     }
 
     @Test
@@ -1901,8 +1953,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignment() {
-        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
-        ) {
+        try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, singletonList(t1p));
             subscriptions.seek(t1p, 100);