You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/03/07 08:07:38 UTC

[kafka] branch trunk updated: Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f6bf95  Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)
7f6bf95 is described below

commit 7f6bf95c1e4875b1042746e3d73240496073f081
Author: Nicholas Parker <ni...@thelastpickle.com>
AuthorDate: Thu Mar 7 21:07:23 2019 +1300

    Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)
    
    * Fix for KAFKA-7974: Avoid calling disconnect() when not connecting
    
    * Resolve host only when currentAddress() is called
    
    Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
    Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.
    
    * Add Javadoc to ClusterConnectionStates.connecting()
---
 .../kafka/clients/ClusterConnectionStates.java     | 68 +++++++++++++++-------
 .../org/apache/kafka/clients/NetworkClient.java    | 10 ++--
 .../kafka/clients/ClusterConnectionStatesTest.java | 14 ++---
 .../apache/kafka/clients/NetworkClientTest.java    |  8 ++-
 4 files changed, 66 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 376b35d..e9bd971 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -108,16 +109,18 @@ final class ClusterConnectionStates {
     }
 
     /**
-     * Enter the connecting state for the given connection.
+     * Enter the connecting state for the given connection, moving to a new resolved address if necessary.
      * @param id the id of the connection
-     * @param now the current time
-     * @throws UnknownHostException 
+     * @param now the current time in ms
+     * @param host the host of the connection, to be resolved internally if needed
+     * @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host}
      */
-    public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+    public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) {
         NodeConnectionState connectionState = nodeState.get(id);
         if (connectionState != null && connectionState.host().equals(host)) {
             connectionState.lastConnectAttemptMs = now;
             connectionState.state = ConnectionState.CONNECTING;
+            // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved
             connectionState.moveToNextAddress();
             return;
         } else if (connectionState != null) {
@@ -130,14 +133,19 @@ final class ClusterConnectionStates {
             this.reconnectBackoffInitMs, host, clientDnsLookup));
     }
 
-    public InetAddress currentAddress(String id) {
-        return nodeState.get(id).currentAddress();
+    /**
+     * Returns a resolved address for the given connection, resolving it if necessary.
+     * @param id the id of the connection
+     * @throws UnknownHostException if the address was not resolvable
+     */
+    public InetAddress currentAddress(String id) throws UnknownHostException {
+        return nodeState(id).currentAddress();
     }
 
     /**
      * Enter the disconnected state for the given node.
      * @param id the connection we have disconnected
-     * @param now the current time
+     * @param now the current time in ms
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
@@ -212,7 +220,7 @@ final class ClusterConnectionStates {
     /**
      * Enter the authentication failed state for the given node.
      * @param id the connection identifier
-     * @param now the current time
+     * @param now the current time in ms
      * @param exception the authentication exception
      */
     public void authenticationFailed(String id, long now, AuthenticationException exception) {
@@ -227,7 +235,7 @@ final class ClusterConnectionStates {
      * Return true if the connection is in the READY state and currently not throttled.
      *
      * @param id the connection identifier
-     * @param now the current time
+     * @param now the current time in ms
      */
     public boolean isReady(String id, long now) {
         return isReady(nodeState.get(id), now);
@@ -241,7 +249,7 @@ final class ClusterConnectionStates {
      * Return true if there is at least one node with connection in the READY state and not throttled. Returns false
      * otherwise.
      *
-     * @param now the current time
+     * @param now the current time in ms
      */
     public boolean hasReadyNodes(long now) {
         for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
@@ -353,14 +361,15 @@ final class ClusterConnectionStates {
         // Connection is being throttled if current time < throttleUntilTimeMs.
         long throttleUntilTimeMs;
         private List<InetAddress> addresses;
-        private int index = 0;
+        private int addressIndex;
         private final String host;
         private final ClientDnsLookup clientDnsLookup;
 
-        public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, 
-                String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+        private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
+                String host, ClientDnsLookup clientDnsLookup) {
             this.state = state;
-            this.addresses = ClientUtils.resolve(host, clientDnsLookup);
+            this.addresses = Collections.emptyList();
+            this.addressIndex = -1;
             this.authenticationException = null;
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
@@ -374,17 +383,32 @@ final class ClusterConnectionStates {
             return host;
         }
 
-        public InetAddress currentAddress() {
-            return addresses.get(index);
+        /**
+         * Fetches the current selected IP address for this node, resolving {@link #host()} if necessary.
+         * @return the selected address
+         * @throws UnknownHostException if resolving {@link #host()} fails
+         */
+        private InetAddress currentAddress() throws UnknownHostException {
+            if (addresses.isEmpty()) {
+                // (Re-)initialize list
+                addresses = ClientUtils.resolve(host, clientDnsLookup);
+                addressIndex = 0;
+            }
+
+            return addresses.get(addressIndex);
         }
 
-        /*
-         * implementing a ring buffer with the addresses
+        /**
+         * Jumps to the next available resolved address for this node. If no other addresses are available, marks the
+         * list to be refreshed on the next {@link #currentAddress()} call.
          */
-        public void moveToNextAddress() throws UnknownHostException {
-            index = (index + 1) % addresses.size();
-            if (index == 0)
-                addresses = ClientUtils.resolve(host, clientDnsLookup);
+        private void moveToNextAddress() {
+            if (addresses.isEmpty())
+                return; // Avoid div0. List will initialize on next currentAddress() call
+
+            addressIndex = (addressIndex + 1) % addresses.size();
+            if (addressIndex == 0)
+                addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
         }
 
         public String toString() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0c5230d..44446b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -697,7 +697,7 @@ public class NetworkClient implements KafkaClient {
      * @param responses The list of responses to update
      * @param nodeId Id of the node to be disconnected
      * @param now The current time
-     * @param disconnectState The state of the disconnected channel           
+     * @param disconnectState The state of the disconnected channel
      */
     private void processDisconnection(List<ClientResponse> responses,
                                       String nodeId,
@@ -910,23 +910,25 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Initiate a connection to the given node
+     * @param node the node to connect to
+     * @param now current time in epoch milliseconds
      */
     private void initiateConnect(Node node, long now) {
         String nodeConnectionId = node.idString();
         try {
-            this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
-            InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
+            connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
+            InetAddress address = connectionStates.currentAddress(nodeConnectionId);
             log.debug("Initiating connection to node {} using address {}", node, address);
             selector.connect(nodeConnectionId,
                     new InetSocketAddress(address, node.port()),
                     this.socketSendBuffer,
                     this.socketReceiveBuffer);
         } catch (IOException e) {
+            log.warn("Error connecting to node {}", node, e);
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(nodeConnectionId, now);
             /* maybe the problem is our metadata, update it */
             metadataUpdater.requestUpdate();
-            log.warn("Error connecting to node {}", node, e);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 79afb75..19b701d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -53,7 +53,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testClusterConnectionStateChanges() throws UnknownHostException {
+    public void testClusterConnectionStateChanges() {
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
 
         // Start connecting to Node and check state
@@ -97,7 +97,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testMultipleNodeConnectionStates() throws UnknownHostException {
+    public void testMultipleNodeConnectionStates() {
         // Check initial state, allowed to connect to all nodes, but no nodes shown as ready
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
         assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
@@ -135,7 +135,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testAuthorizationFailed() throws UnknownHostException {
+    public void testAuthorizationFailed() {
         // Try connecting
         connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
 
@@ -156,7 +156,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testRemoveNode() throws UnknownHostException {
+    public void testRemoveNode() {
         connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
@@ -171,7 +171,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testMaxReconnectBackoff() throws UnknownHostException {
+    public void testMaxReconnectBackoff() {
         long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter));
         connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
@@ -191,7 +191,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testExponentialReconnectBackoff() throws UnknownHostException {
+    public void testExponentialReconnectBackoff() {
         // Calculate fixed components for backoff process
         final int reconnectBackoffExpBase = 2;
         double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
@@ -211,7 +211,7 @@ public class ClusterConnectionStatesTest {
     }
 
     @Test
-    public void testThrottled() throws UnknownHostException {
+    public void testThrottled() {
         connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
         time.sleep(1000);
         connectionStates.ready(nodeId1);
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 5a4e1e8..e098236 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -82,7 +82,7 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
-                64 * 1024, 64 * 1024, defaultRequestTimeoutMs, 
+                64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
                 ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
     }
 
@@ -117,6 +117,12 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testDnsLookupFailure() {
+        /* Fail cleanly when the node has a bad hostname */
+        assertFalse(client.ready(new Node(1234, "badhost", 1234), time.milliseconds()));
+    }
+
+    @Test
     public void testClose() {
         client.ready(node, time.milliseconds());
         awaitReady(client, node);