You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/06/15 08:28:16 UTC

[kafka] branch 2.8 updated: KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed (#10882)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 3878165  KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed (#10882)
3878165 is described below

commit 3878165a066791bf850030423724b4342da98985
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jun 15 09:18:30 2021 +0100

    KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed (#10882)
    
    NetworkClient.poll() throws IllegalStateException when checking isConnectionSetupTimeout if all nodes in ClusterConnectionStates.connectingNodes aren't present in ClusterConnectionStates.nodeState. This commit ensures that when we remove a node from nodeState, we also remove from connectingNodes.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/clients/ClusterConnectionStates.java     |  1 +
 .../apache/kafka/clients/NetworkClientTest.java    | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index e00494c..20de2ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -389,6 +389,7 @@ final class ClusterConnectionStates {
      */
     public void remove(String id) {
         nodeState.remove(id);
+        connectingNodes.remove(id);
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b13f854..dac6424 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1064,6 +1064,39 @@ public class NetworkClientTest {
         assertEquals(2, mockHostResolver.resolutionCount());
     }
 
+    @Test
+    public void testCloseConnectingNode() {
+        Cluster cluster = TestUtils.clusterWith(2);
+        Node node0 = cluster.nodeById(0);
+        Node node1 = cluster.nodeById(1);
+        client.ready(node0, time.milliseconds());
+        selector.serverConnectionBlocked(node0.idString());
+        client.poll(1, time.milliseconds());
+        client.close(node0.idString());
+
+        // Poll without any connections should return without exceptions
+        client.poll(0, time.milliseconds());
+        assertFalse(NetworkClientUtils.isReady(client, node0, time.milliseconds()));
+        assertFalse(NetworkClientUtils.isReady(client, node1, time.milliseconds()));
+
+        // Connection to new node should work
+        client.ready(node1, time.milliseconds());
+        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 0);
+        selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), buffer)));
+        while (!client.ready(node1, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        assertTrue(client.isReady(node1, time.milliseconds()));
+        selector.clear();
+
+        // New connection to node closed earlier should work
+        client.ready(node0, time.milliseconds());
+        buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 1);
+        selector.delayedReceive(new DelayedReceive(node0.idString(), new NetworkReceive(node0.idString(), buffer)));
+        while (!client.ready(node0, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        assertTrue(client.isReady(node0, time.milliseconds()));
+    }
+
     private RequestHeader parseHeader(ByteBuffer buffer) {
         buffer.getInt(); // skip size
         return RequestHeader.parse(buffer.slice());