You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/21 17:08:57 UTC
[kafka] branch 3.0 updated: KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3234794 KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)
3234794 is described below
commit 3234794a3a2381f4e0617edb70dcaff422cc10cc
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Jan 21 17:44:56 2022 +0100
KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS (#11671)
At the moment, the `NetworkClient` will remain stuck in the `CHECKING_API_VERSIONS` state forever if the `Channel` does not become ready. To prevent this from happening, this patch changes the logic to transition to the `CHECKING_API_VERSIONS` only when the `ApiVersionsRequest` is queued to be sent out. With this, the connection will timeout if the `Channel` does not become ready within the connection setup timeout. Once the `ApiVersionsRequest` is queued up, the request timeout takes over.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../org/apache/kafka/clients/NetworkClient.java | 6 +++++-
.../org/apache/kafka/clients/NetworkClientTest.java | 21 +++++++++++++++++++++
.../java/org/apache/kafka/test/MockSelector.java | 14 +++++++++++---
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index baceaba..946b1f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -938,7 +938,6 @@ public class NetworkClient implements KafkaClient {
// Therefore, it is still necessary to check isChannelReady before attempting to send on this
// connection.
if (discoverBrokerVersions) {
- this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
@@ -955,6 +954,11 @@ public class NetworkClient implements KafkaClient {
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
+ // We transition the connection to the CHECKING_API_VERSIONS state only when
+ // the ApiVersionsRequest is queued up to be sent out. Without this, the client
+ // could remain in the CHECKING_API_VERSIONS state forever if the channel does
+ // not before ready.
+ this.connectionStates.checkingApiVersions(node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 4fbfd42..fe1e9d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1097,6 +1097,27 @@ public class NetworkClientTest {
assertTrue(client.isReady(node0, time.milliseconds()));
}
+ @Test
+ public void testConnectionDoesNotRemainStuckInCheckingApiVersionsStateIfChannelNeverBecomesReady() {
+ final Cluster cluster = TestUtils.clusterWith(1);
+ final Node node = cluster.nodeById(0);
+
+ // Channel is ready by default so we mark it as not ready.
+ client.ready(node, time.milliseconds());
+ selector.channelNotReady(node.idString());
+
+ // Channel should not be ready.
+ client.poll(0, time.milliseconds());
+ assertFalse(NetworkClientUtils.isReady(client, node, time.milliseconds()));
+
+ // Connection should time out if the channel does not become ready within
+ // the connection setup timeout. This ensures that the client does not remain
+ // stuck in the CHECKING_API_VERSIONS state.
+ time.sleep((long) (connectionSetupTimeoutMsTest * 1.2) + 1);
+ client.poll(0, time.milliseconds());
+ assertTrue(client.connectionFailed(node));
+ }
+
private RequestHeader parseHeader(ByteBuffer buffer) {
buffer.getInt(); // skip size
return RequestHeader.parse(buffer.slice());
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index d1d79dc..2d7c151 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -29,9 +29,11 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Predicate;
/**
@@ -48,6 +50,7 @@ public class MockSelector implements Selectable {
private final List<String> connected = new ArrayList<>();
private final List<DelayedReceive> delayedReceives = new ArrayList<>();
private final Predicate<InetSocketAddress> canConnect;
+ private final Set<String> ready = new HashSet<>();
public MockSelector(Time time) {
this(time, null);
@@ -62,6 +65,7 @@ public class MockSelector implements Selectable {
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
if (canConnect == null || canConnect.test(address)) {
this.connected.add(id);
+ this.ready.add(id);
}
}
@@ -91,8 +95,8 @@ public class MockSelector implements Selectable {
/**
* Since MockSelector.connect will always succeed and add the
* connection id to the Set connected, we can only simulate
- * that the connection is still pending by remove the connection
- * id from the Set connected
+ * that the connection is still pending by removing the connection
+ * id from the Set connected.
*
* @param id connection id
*/
@@ -221,9 +225,13 @@ public class MockSelector implements Selectable {
public void unmuteAll() {
}
+ public void channelNotReady(String id) {
+ ready.remove(id);
+ }
+
@Override
public boolean isChannelReady(String id) {
- return true;
+ return ready.contains(id);
}
public void reset() {