You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2021/09/17 13:37:12 UTC

[kafka] branch trunk updated: KAFKA-12762: Use connection timeout when polling the network for new connections (#10649)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55701dc  KAFKA-12762: Use connection timeout when polling the network for new connections (#10649)
55701dc is described below

commit 55701dc00ab72233e7c238550a423fdce479afc1
Author: Edoardo Comar <ec...@uk.ibm.com>
AuthorDate: Fri Sep 17 14:35:49 2021 +0100

    KAFKA-12762: Use connection timeout when polling the network for new connections (#10649)
    
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>, Tom Bentley <tb...@redhat.com>
    
    Co-authored-by: Mickael Maison <mi...@gmail.com>
    Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
    Co-authored-by: Tom Bentley <to...@users.noreply.github.com>
---
 .../main/java/org/apache/kafka/clients/ClientUtils.java |  6 +++++-
 .../apache/kafka/clients/ClusterConnectionStates.java   | 11 +++++++----
 .../main/java/org/apache/kafka/clients/admin/Admin.java |  2 +-
 .../apache/kafka/clients/admin/KafkaAdminClient.java    | 14 ++++++++++++--
 .../kafka/clients/ClusterConnectionStatesTest.java      | 12 ++++++++----
 .../kafka/clients/admin/AdminClientTestUtils.java       |  8 ++++++++
 .../kafka/api/PlaintextAdminIntegrationTest.scala       | 17 ++++++++++++++++-
 7 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 72fec52..86d4678 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -33,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
@@ -107,7 +108,10 @@ public final class ClientUtils {
 
     static List<InetAddress> resolve(String host, HostResolver hostResolver) throws UnknownHostException {
         InetAddress[] addresses = hostResolver.resolve(host);
-        return filterPreferredAddresses(addresses);
+        List<InetAddress> result = filterPreferredAddresses(addresses);
+        if (log.isDebugEnabled())
+            log.debug("Resolved host {} as {}", host, result.stream().map(i -> i.getHostAddress()).collect(Collectors.joining(",")));
+        return result;
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 524d54b..95efdbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -97,19 +97,22 @@ final class ClusterConnectionStates {
 
     /**
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
-     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
-     * connections.
+     * disconnected, this respects the reconnect backoff time. When connecting, return a delay based on the connection timeout.
+     * When connected, wait indefinitely (i.e. until a wakeup).
      * @param id the connection to check
      * @param now the current time in ms
      */
     public long connectionDelay(String id, long now) {
         NodeConnectionState state = nodeState.get(id);
         if (state == null) return 0;
-        if (state.state.isDisconnected()) {
+
+        if (state.state == ConnectionState.CONNECTING) {
+            return connectionSetupTimeoutMs(id);
+        } else if (state.state.isDisconnected()) {
             long timeWaited = now - state.lastConnectAttemptMs;
             return Math.max(state.reconnectBackoffMs - timeWaited, 0);
         } else {
-            // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+            // When connected, we should be able to delay indefinitely since other events (connection or
             // data acked) will cause a wakeup once data can be sent.
             return Long.MAX_VALUE;
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 4b6fe49..377f009 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -140,7 +140,7 @@ public interface Admin extends AutoCloseable {
      * @return The new KafkaAdminClient.
      */
     static Admin create(Map<String, Object> conf) {
-        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null);
+        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null, null);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cbe1a33..c34c748 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -22,6 +22,8 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.DefaultHostResolver;
+import org.apache.kafka.clients.HostResolver;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -465,6 +467,11 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory) {
+        return createInternal(config, timeoutProcessorFactory, null);
+    }
+
+    static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcessorFactory timeoutProcessorFactory,
+                                           HostResolver hostResolver) {
         Metrics metrics = null;
         NetworkClient networkClient = null;
         Time time = Time.SYSTEM;
@@ -503,8 +510,9 @@ public class KafkaAdminClient extends AdminClient {
             selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                     metrics, time, metricGrpPrefix, channelBuilder, logContext);
             networkClient = new NetworkClient(
-                selector,
                 metadataManager.updater(),
+                null,
+                selector,
                 clientId,
                 1,
                 config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
@@ -517,7 +525,9 @@ public class KafkaAdminClient extends AdminClient {
                 time,
                 true,
                 apiVersions,
-                logContext);
+                null,
+                logContext,
+                (hostResolver == null) ? new DefaultHostResolver() : hostResolver);
             return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
                 timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 21ef8d7..72cc123 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -90,31 +90,35 @@ public class ClusterConnectionStatesTest {
     @Test
     public void testClusterConnectionStateChanges() {
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+        assertEquals(0, connectionStates.connectionDelay(nodeId1, time.milliseconds()));
 
         // Start connecting to Node and check state
         connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
-        assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING);
+        assertEquals(ConnectionState.CONNECTING, connectionStates.connectionState(nodeId1));
         assertTrue(connectionStates.isConnecting(nodeId1));
         assertFalse(connectionStates.isReady(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.hasReadyNodes(time.milliseconds()));
+        long connectionDelay = connectionStates.connectionDelay(nodeId1, time.milliseconds());
+        double connectionDelayDelta = connectionSetupTimeoutMs * connectionSetupTimeoutJitter;
+        assertEquals(connectionSetupTimeoutMs, connectionDelay, connectionDelayDelta);
 
         time.sleep(100);
 
         // Successful connection
         connectionStates.ready(nodeId1);
-        assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.READY);
+        assertEquals(ConnectionState.READY, connectionStates.connectionState(nodeId1));
         assertTrue(connectionStates.isReady(nodeId1, time.milliseconds()));
         assertTrue(connectionStates.hasReadyNodes(time.milliseconds()));
         assertFalse(connectionStates.isConnecting(nodeId1));
         assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
-        assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), Long.MAX_VALUE);
+        assertEquals(Long.MAX_VALUE, connectionStates.connectionDelay(nodeId1, time.milliseconds()));
 
         time.sleep(15000);
 
         // Disconnected from broker
         connectionStates.disconnected(nodeId1, time.milliseconds());
-        assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.DISCONNECTED);
+        assertEquals(ConnectionState.DISCONNECTED, connectionStates.connectionState(nodeId1));
         assertTrue(connectionStates.isDisconnected(nodeId1));
         assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds()));
         assertFalse(connectionStates.isConnecting(nodeId1));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 3db5739..587434a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.clients.HostResolver;
 import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
 import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -116,4 +117,11 @@ public class AdminClientTestUtils {
                                                                   Map<TopicPartition, KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>> futures) {
         return adminClient.getListOffsetsCalls(context, topicPartitionOffsets, futures); 
     }
+
+    /**
+     * Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package.
+     */
+    public static Admin create(Map<String, Object> conf, HostResolver hostResolver) {
+        return KafkaAdminClient.createInternal(new AdminClientConfig(conf, true), null, hostResolver);
+    }
 }
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 36586ed..ef8174d0 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -17,6 +17,7 @@
 package kafka.api
 
 import java.io.File
+import java.net.InetAddress
 import java.lang.{Long => JLong}
 import java.time.{Duration => JDuration}
 import java.util.Arrays.asList
@@ -24,13 +25,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, Properties}
 import java.{time, util}
-
 import kafka.log.LogConfig
 import kafka.security.authorizer.AclEntry
 import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils._
 import kafka.utils.{Log4jController, TestUtils}
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -101,6 +102,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
   }
 
   @Test
+  def testAdminClientHandlingBadIPWithoutTimeout(): Unit = {
+    val config = createConfig
+    config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000")
+    val returnBadAddressFirst = new HostResolver {
+      override def resolve(host: String): Array[InetAddress] = {
+        Array[InetAddress](InetAddress.getByName("10.200.20.100"), InetAddress.getByName(host))
+      }
+    }
+    client = AdminClientTestUtils.create(config, returnBadAddressFirst)
+    // simply check that a call, e.g. describeCluster, returns normally
+    client.describeCluster().nodes().get()
+  }
+
+  @Test
   def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
     client = Admin.create(createConfig)
     val topic = "mytopic"