You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2024/01/04 13:40:38 UTC

(kafka) branch 3.6 updated: KAFKA-15817: Avoid reconnecting to the same IP address (#14813) (#14980)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new ab92eddb7cb KAFKA-15817: Avoid reconnecting to the same IP address (#14813) (#14980)
ab92eddb7cb is described below

commit ab92eddb7cbdf40393ff9171649a09c3647eae48
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Thu Jan 4 14:40:30 2024 +0100

    KAFKA-15817: Avoid reconnecting to the same IP address (#14813) (#14980)
    
    If multiple addresses are available. This change is a follow-up to #9902. When re-resolving DNS after disconnecting, it is possible (likely, even) that we will resolve the same set of addresses in the same order, meaning we could attempt to reconnect to the same IP that we just disconnected from. In the case where we disconnected from the IP address because it has become unhealthy (due to a load balancer going down or a network routing layer restarting, for example), the client will n [...]
    
    Cherry-pick of 47e3777
    
    Reviewers: Lucas Brutschy <lb...@confluent.io>, Andrew Schofield <as...@confluent.io>
    
    Co-authored-by: Bob Barrett <bo...@confluent.io>
---
 .../kafka/clients/ClusterConnectionStates.java     | 36 +++++++++++++++++-----
 .../kafka/clients/ClusterConnectionStatesTest.java | 19 ++++++++++++
 2 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index f4d90922587..91708d18d22 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -160,7 +160,7 @@ final class ClusterConnectionStates {
         // Create a new NodeConnectionState if nodeState does not already contain one
         // for the specified id or if the hostname associated with the node id changed.
         nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
-                reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, hostResolver));
+                reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, hostResolver, log));
         connectingNodes.add(id);
     }
 
@@ -479,9 +479,11 @@ final class ClusterConnectionStates {
         private int addressIndex;
         private final String host;
         private final HostResolver hostResolver;
+        private InetAddress lastAttemptedAddress;
+        private Logger log;
 
         private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
-                long connectionSetupTimeoutMs, String host, HostResolver hostResolver) {
+                long connectionSetupTimeoutMs, String host, HostResolver hostResolver, Logger log) {
             this.state = state;
             this.addresses = Collections.emptyList();
             this.addressIndex = -1;
@@ -493,6 +495,7 @@ final class ClusterConnectionStates {
             this.throttleUntilTimeMs = 0;
             this.host = host;
             this.hostResolver = hostResolver;
+            this.log = log;
         }
 
         public String host() {
@@ -506,12 +509,14 @@ final class ClusterConnectionStates {
          */
         private InetAddress currentAddress() throws UnknownHostException {
             if (addresses.isEmpty()) {
-                // (Re-)initialize list
-                addresses = ClientUtils.resolve(host, hostResolver);
-                addressIndex = 0;
+                resolveAddresses();
             }
 
-            return addresses.get(addressIndex);
+            // Save the address that we return so that we don't try it twice in a row when we re-resolve due to
+            // disconnecting or exhausting the addresses
+            InetAddress currentAddress = addresses.get(addressIndex);
+            lastAttemptedAddress = currentAddress;
+            return currentAddress;
         }
 
         /**
@@ -524,7 +529,24 @@ final class ClusterConnectionStates {
 
             addressIndex = (addressIndex + 1) % addresses.size();
             if (addressIndex == 0)
-                addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
+                clearAddresses(); // Exhausted list. Re-resolve on next currentAddress() call
+        }
+
+        private void resolveAddresses() throws UnknownHostException {
+            // (Re-)initialize list
+            addresses = ClientUtils.resolve(host, hostResolver);
+            if (log.isDebugEnabled()) {
+                log.debug("Resolved host {} to addresses {}", host, addresses);
+            }
+            addressIndex = 0;
+
+            // We re-resolve DNS after disconnecting, but we don't want to immediately reconnect to the address we
+            // just disconnected from, in case we disconnected due to a problem with that IP (such as a load
+            // balancer instance failure). Check the first address in the list and skip it if it was the last address
+            // we tried and there are multiple addresses to choose from.
+            if (addresses.size() > 1 && addresses.get(addressIndex).equals(lastAttemptedAddress)) {
+                addressIndex++;
+            }
         }
 
         /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index c672540cf9e..2da4bbeba3a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -420,6 +420,25 @@ public class ClusterConnectionStatesTest {
         assertEquals(0, connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).size());
     }
 
+    @Test
+    public void testSkipLastAttemptedIp() throws UnknownHostException {
+        setupMultipleIPs();
+
+        assertTrue(ClientUtils.resolve(hostTwoIps, multipleIPHostResolver).size() > 1);
+
+        // Connect to the first IP
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps);
+        InetAddress addr1 = connectionStates.currentAddress(nodeId1);
+
+        // Disconnect, which will trigger re-resolution with the first IP still first
+        connectionStates.disconnected(nodeId1, time.milliseconds());
+
+        // Connect again, the first IP should get skipped
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps);
+        InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+        assertNotSame(addr1, addr2);
+    }
+    
     private void setupMultipleIPs() {
         this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
                 connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver);