You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/09/17 13:37:12 UTC
[kafka] branch trunk updated: KAFKA-12762: Use connection timeout
when polling the network for new connections (#10649)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55701dc KAFKA-12762: Use connection timeout when polling the network for new connections (#10649)
55701dc is described below
commit 55701dc00ab72233e7c238550a423fdce479afc1
Author: Edoardo Comar <ec...@uk.ibm.com>
AuthorDate: Fri Sep 17 14:35:49 2021 +0100
KAFKA-12762: Use connection timeout when polling the network for new connections (#10649)
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>, Tom Bentley <tb...@redhat.com>
Co-authored-by: Mickael Maison <mi...@gmail.com>
Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
Co-authored-by: Tom Bentley <to...@users.noreply.github.com>
---
.../main/java/org/apache/kafka/clients/ClientUtils.java | 6 +++++-
.../apache/kafka/clients/ClusterConnectionStates.java | 11 +++++++----
.../main/java/org/apache/kafka/clients/admin/Admin.java | 2 +-
.../apache/kafka/clients/admin/KafkaAdminClient.java | 14 ++++++++++++--
.../kafka/clients/ClusterConnectionStatesTest.java | 12 ++++++++----
.../kafka/clients/admin/AdminClientTestUtils.java | 8 ++++++++
.../kafka/api/PlaintextAdminIntegrationTest.scala | 17 ++++++++++++++++-
7 files changed, 57 insertions(+), 13 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 72fec52..86d4678 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
@@ -107,7 +108,10 @@ public final class ClientUtils {
static List<InetAddress> resolve(String host, HostResolver hostResolver) throws UnknownHostException {
InetAddress[] addresses = hostResolver.resolve(host);
- return filterPreferredAddresses(addresses);
+ List<InetAddress> result = filterPreferredAddresses(addresses);
+ if (log.isDebugEnabled())
+ log.debug("Resolved host {} as {}", host, result.stream().map(i -> i.getHostAddress()).collect(Collectors.joining(",")));
+ return result;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 524d54b..95efdbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -97,19 +97,22 @@ final class ClusterConnectionStates {
/**
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
- * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
- * connections.
+ * disconnected, this respects the reconnect backoff time. When connecting, return a delay based on the connection timeout.
+ * When connected, wait indefinitely (i.e. until a wakeup).
* @param id the connection to check
* @param now the current time in ms
*/
public long connectionDelay(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null) return 0;
- if (state.state.isDisconnected()) {
+
+ if (state.state == ConnectionState.CONNECTING) {
+ return connectionSetupTimeoutMs(id);
+ } else if (state.state.isDisconnected()) {
long timeWaited = now - state.lastConnectAttemptMs;
return Math.max(state.reconnectBackoffMs - timeWaited, 0);
} else {
- // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+ // When connected, we should be able to delay indefinitely since other events (connection or
// data acked) will cause a wakeup once data can be sent.
return Long.MAX_VALUE;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 4b6fe49..377f009 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -140,7 +140,7 @@ public interface Admin extends AutoCloseable {
* @return The new KafkaAdminClient.
*/
static Admin create(Map<String, Object> conf) {
- return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
+ return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null, null);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cbe1a33..c34c748 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.DefaultHostResolver;
+import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
@@ -465,6 +467,11 @@ public class KafkaAdminClient extends AdminClient {
}
static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
+ return createInternal(config, timeoutProcessorFactory, null);
+ }
+
+ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,
+ HostResolver hostResolver) {
Metrics metrics = null;
NetworkClient networkClient = null;
Time time = Time.SYSTEM;
@@ -503,8 +510,9 @@ public class KafkaAdminClient extends AdminClient {
selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext);
networkClient = new NetworkClient(
- selector,
metadataManager.updater(),
+ null,
+ selector,
clientId,
1,
config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
@@ -517,7 +525,9 @@ public class KafkaAdminClient extends AdminClient {
time,
true,
apiVersions,
- logContext);
+ null,
+ logContext,
+ (hostResolver == null) ? new DefaultHostResolver() : hostResolver);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
} catch (Throwable exc) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 21ef8d7..72cc123 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -90,31 +90,35 @@ public class ClusterConnectionStatesTest {
@Test
public void testClusterConnectionStateChanges() {
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+ assertEquals(0, connectionStates.connectionDelay(nodeId1, time.milliseconds()));
// Start connecting to Node and check state
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
- assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING);
+ assertEquals(ConnectionState.CONNECTING, connectionStates.connectionState(nodeId1));
assertTrue(connectionStates.isConnecting(nodeId1));
assertFalse(connectionStates.isReady(nodeId1, time.milliseconds()));
assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
+ long connectionDelay = connectionStates.connectionDelay(nodeId1, time.milliseconds());
+ double connectionDelayDelta = connectionSetupTimeoutMs * connectionSetupTimeoutJitter;
+ assertEquals(connectionSetupTimeoutMs, connectionDelay, connectionDelayDelta);
time.sleep(100);
// Successful connection
connectionStates.ready(nodeId1);
- assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.READY);
+ assertEquals(ConnectionState.READY, connectionStates.connectionState(nodeId1));
assertTrue(connectionStates.isReady(nodeId1, time.milliseconds()));
assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
assertFalse(connectionStates.isConnecting(nodeId1));
assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
- assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), Long.MAX_VALUE);
+ assertEquals(Long.MAX_VALUE, connectionStates.connectionDelay(nodeId1, time.milliseconds()));
time.sleep(15000);
// Disconnected from broker
connectionStates.disconnected(nodeId1, time.milliseconds());
- assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.DISCONNECTED);
+ assertEquals(ConnectionState.DISCONNECTED, connectionStates.connectionState(nodeId1));
assertTrue(connectionStates.isDisconnected(nodeId1));
assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
assertFalse(connectionStates.isConnecting(nodeId1));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 3db5739..587434a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -116,4 +117,11 @@ public class AdminClientTestUtils {
Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures) {
return adminClient.getListOffsetsCalls(context, topicPartitionOffsets, futures);
}
+
+ /**
+ * Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package.
+ */
+ public static Admin create(Map<String, Object> conf, HostResolver hostResolver) {
+ return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null, hostResolver);
+ }
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 36586ed..ef8174d0 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -17,6 +17,7 @@
package kafka.api
import java.io.File
+import java.net.InetAddress
import java.lang.{Long => JLong}
import java.time.{Duration => JDuration}
import java.util.Arrays.asList
@@ -24,13 +25,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
-
import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry
import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestUtils}
import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -101,6 +102,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
@Test
+ def testAdminClientHandlingBadIPWithoutTimeout(): Unit = {
+ val config = createConfig
+ config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000")
+ val returnBadAddressFirst = new HostResolver {
+ override def resolve(host: String): Array[InetAddress] = {
+ Array[InetAddress](InetAddress.getByName("10.200.20.100"), InetAddress.getByName(host))
+ }
+ }
+ client = AdminClientTestUtils.create(config, returnBadAddressFirst)
+ // simply check that a call, e.g. describeCluster, returns normally
+ client.describeCluster().nodes().get()
+ }
+
+ @Test
def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
client = Admin.create(createConfig)
val topic = "mytopic"