You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/06/15 08:28:16 UTC
[kafka] branch 2.8 updated: KAFKA-12948: Remove node from
ClusterConnectionStates.connectingNodes when node is removed (#10882)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 3878165 KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed (#10882)
3878165 is described below
commit 3878165a066791bf850030423724b4342da98985
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jun 15 09:18:30 2021 +0100
KAFKA-12948: Remove node from ClusterConnectionStates.connectingNodes when node is removed (#10882)
NetworkClient.poll() throws IllegalStateException when checking isConnectionSetupTimeout if all nodes in ClusterConnectionStates.connectingNodes aren't present in ClusterConnectionStates.nodeState. This commit ensures that when we remove a node from nodeState, we also remove from connectingNodes.
Reviewers: David Jacot <dj...@confluent.io>
---
.../kafka/clients/ClusterConnectionStates.java | 1 +
.../apache/kafka/clients/NetworkClientTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index e00494c..20de2ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -389,6 +389,7 @@ final class ClusterConnectionStates {
*/
public void remove(String id) {
nodeState.remove(id);
+ connectingNodes.remove(id);
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b13f854..dac6424 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1064,6 +1064,39 @@ public class NetworkClientTest {
assertEquals(2, mockHostResolver.resolutionCount());
}
+ @Test
+ public void testCloseConnectingNode() {
+ Cluster cluster = TestUtils.clusterWith(2);
+ Node node0 = cluster.nodeById(0);
+ Node node1 = cluster.nodeById(1);
+ client.ready(node0, time.milliseconds());
+ selector.serverConnectionBlocked(node0.idString());
+ client.poll(1, time.milliseconds());
+ client.close(node0.idString());
+
+ // Poll without any connections should return without exceptions
+ client.poll(0, time.milliseconds());
+ assertFalse(NetworkClientUtils.isReady(client, node0, time.milliseconds()));
+ assertFalse(NetworkClientUtils.isReady(client, node1, time.milliseconds()));
+
+ // Connection to new node should work
+ client.ready(node1, time.milliseconds());
+ ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 0);
+ selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), buffer)));
+ while (!client.ready(node1, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ assertTrue(client.isReady(node1, time.milliseconds()));
+ selector.clear();
+
+ // New connection to node closed earlier should work
+ client.ready(node0, time.milliseconds());
+ buffer = RequestTestUtils.serializeResponseWithHeader(defaultApiVersionsResponse(), ApiKeys.API_VERSIONS.latestVersion(), 1);
+ selector.delayedReceive(new DelayedReceive(node0.idString(), new NetworkReceive(node0.idString(), buffer)));
+ while (!client.ready(node0, time.milliseconds()))
+ client.poll(1, time.milliseconds());
+ assertTrue(client.isReady(node0, time.milliseconds()));
+ }
+
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());