You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/21 17:08:57 UTC

[kafka] branch 3.0 updated: KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3234794  KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)
3234794 is described below

commit 3234794a3a2381f4e0617edb70dcaff422cc10cc
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Jan 21 17:44:56 2022 +0100

    KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)
    
    At the moment, the `NetworkClient` will remain stuck in the `CHECKING_API_VERSIONS` state forever if the `Channel` does not become ready. To prevent this from happening, this patch changes the logic to transition to the `CHECKING_API_VERSIONS` only when the `ApiVersionsRequest` is queued to be sent out. With this, the connection will timeout if the `Channel` does not become ready within the connection setup timeout. Once the `ApiVersionsRequest` is queued up, the request timeout takes over.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../org/apache/kafka/clients/NetworkClient.java     |  6 +++++-
 .../org/apache/kafka/clients/NetworkClientTest.java | 21 +++++++++++++++++++++
 .../java/org/apache/kafka/test/MockSelector.java    | 14 +++++++++++---
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index baceaba..946b1f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -938,7 +938,6 @@ public class NetworkClient implements KafkaClient {
             // Therefore, it is still necessary to check isChannelReady before attempting to send on this
             // connection.
             if (discoverBrokerVersions) {
-                this.connectionStates.checkingApiVersions(node);
                 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                 log.debug("Completed connection to node {}. Fetching API versions.", node);
             } else {
@@ -955,6 +954,11 @@ public class NetworkClient implements KafkaClient {
             String node = entry.getKey();
             if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                 log.debug("Initiating API versions fetch from node {}.", node);
+                // We transition the connection to the CHECKING_API_VERSIONS state only when
+                // the ApiVersionsRequest is queued up to be sent out. Without this, the client
+                // could remain in the CHECKING_API_VERSIONS state forever if the channel does
+                // not before ready.
+                this.connectionStates.checkingApiVersions(node);
                 ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
                 ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
                 doSend(clientRequest, true, now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 4fbfd42..fe1e9d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1097,6 +1097,27 @@ public class NetworkClientTest {
         assertTrue(client.isReady(node0, time.milliseconds()));
     }
 
+    @Test
+    public void testConnectionDoesNotRemainStuckInCheckingApiVersionsStateIfChannelNeverBecomesReady() {
+        final Cluster cluster = TestUtils.clusterWith(1);
+        final Node node = cluster.nodeById(0);
+
+        // Channel is ready by default so we mark it as not ready.
+        client.ready(node, time.milliseconds());
+        selector.channelNotReady(node.idString());
+
+        // Channel should not be ready.
+        client.poll(0, time.milliseconds());
+        assertFalse(NetworkClientUtils.isReady(client, node, time.milliseconds()));
+
+        // Connection should time out if the channel does not become ready within
+        // the connection setup timeout. This ensures that the client does not remain
+        // stuck in the CHECKING_API_VERSIONS state.
+        time.sleep((long) (connectionSetupTimeoutMsTest * 1.2) + 1);
+        client.poll(0, time.milliseconds());
+        assertTrue(client.connectionFailed(node));
+    }
+
     private RequestHeader parseHeader(ByteBuffer buffer) {
         buffer.getInt(); // skip size
         return RequestHeader.parse(buffer.slice());
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index d1d79dc..2d7c151 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -29,9 +29,11 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 
 /**
@@ -48,6 +50,7 @@ public class MockSelector implements Selectable {
     private final List<String> connected = new ArrayList<>();
     private final List<DelayedReceive> delayedReceives = new ArrayList<>();
     private final Predicate<InetSocketAddress> canConnect;
+    private final Set<String> ready = new HashSet<>();
 
     public MockSelector(Time time) {
         this(time, null);
@@ -62,6 +65,7 @@ public class MockSelector implements Selectable {
     public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
         if (canConnect == null || canConnect.test(address)) {
             this.connected.add(id);
+            this.ready.add(id);
         }
     }
 
@@ -91,8 +95,8 @@ public class MockSelector implements Selectable {
     /**
      * Since MockSelector.connect will always succeed and add the
      * connection id to the Set connected, we can only simulate
-     * that the connection is still pending by remove the connection
-     * id from the Set connected
+     * that the connection is still pending by removing the connection
+     * id from the Set connected.
      *
      * @param id connection id
      */
@@ -221,9 +225,13 @@ public class MockSelector implements Selectable {
     public void unmuteAll() {
     }
 
+    public void channelNotReady(String id) {
+        ready.remove(id);
+    }
+
     @Override
     public boolean isChannelReady(String id) {
-        return true;
+        return ready.contains(id);
     }
 
     public void reset() {