You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/04/24 04:50:19 UTC
[kafka] branch 2.2 updated: Fix for KAFKA-7974: Avoid zombie
AdminClient when node host isn't resolvable (#6305)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new d531eb4 Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)
d531eb4 is described below
commit d531eb480e8f7d25cdf8091163b4e3c1ae3fb93c
Author: Nicholas Parker <ni...@thelastpickle.com>
AuthorDate: Thu Mar 7 21:07:23 2019 +1300
Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)
* Fix for KAFKA-7974: Avoid calling disconnect() when not connecting
* Resolve host only when currentAddress() is called
Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state.
Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed.
* Add Javadoc to ClusterConnectionStates.connecting()
---
.../kafka/clients/ClusterConnectionStates.java | 68 +++++++++++++++-------
.../org/apache/kafka/clients/NetworkClient.java | 10 ++--
.../kafka/clients/ClusterConnectionStatesTest.java | 14 ++---
.../apache/kafka/clients/NetworkClientTest.java | 8 ++-
4 files changed, 66 insertions(+), 34 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 376b35d..e9bd971 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -108,16 +109,18 @@ final class ClusterConnectionStates {
}
/**
- * Enter the connecting state for the given connection.
+ * Enter the connecting state for the given connection, moving to a new resolved address if necessary.
* @param id the id of the connection
- * @param now the current time
- * @throws UnknownHostException
+ * @param now the current time in ms
+ * @param host the host of the connection, to be resolved internally if needed
+ * @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host}
*/
- public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+ public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) {
NodeConnectionState connectionState = nodeState.get(id);
if (connectionState != null && connectionState.host().equals(host)) {
connectionState.lastConnectAttemptMs = now;
connectionState.state = ConnectionState.CONNECTING;
+ // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved
connectionState.moveToNextAddress();
return;
} else if (connectionState != null) {
@@ -130,14 +133,19 @@ final class ClusterConnectionStates {
this.reconnectBackoffInitMs, host, clientDnsLookup));
}
- public InetAddress currentAddress(String id) {
- return nodeState.get(id).currentAddress();
+ /**
+ * Returns a resolved address for the given connection, resolving it if necessary.
+ * @param id the id of the connection
+ * @throws UnknownHostException if the address was not resolvable
+ */
+ public InetAddress currentAddress(String id) throws UnknownHostException {
+ return nodeState(id).currentAddress();
}
/**
* Enter the disconnected state for the given node.
* @param id the connection we have disconnected
- * @param now the current time
+ * @param now the current time in ms
*/
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
@@ -212,7 +220,7 @@ final class ClusterConnectionStates {
/**
* Enter the authentication failed state for the given node.
* @param id the connection identifier
- * @param now the current time
+ * @param now the current time in ms
* @param exception the authentication exception
*/
public void authenticationFailed(String id, long now, AuthenticationException exception) {
@@ -227,7 +235,7 @@ final class ClusterConnectionStates {
* Return true if the connection is in the READY state and currently not throttled.
*
* @param id the connection identifier
- * @param now the current time
+ * @param now the current time in ms
*/
public boolean isReady(String id, long now) {
return isReady(nodeState.get(id), now);
@@ -241,7 +249,7 @@ final class ClusterConnectionStates {
* Return true if there is at least one node with connection in the READY state and not throttled. Returns false
* otherwise.
*
- * @param now the current time
+ * @param now the current time in ms
*/
public boolean hasReadyNodes(long now) {
for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
@@ -353,14 +361,15 @@ final class ClusterConnectionStates {
// Connection is being throttled if current time < throttleUntilTimeMs.
long throttleUntilTimeMs;
private List<InetAddress> addresses;
- private int index = 0;
+ private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
- public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
- String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
+ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
+ String host, ClientDnsLookup clientDnsLookup) {
this.state = state;
- this.addresses = ClientUtils.resolve(host, clientDnsLookup);
+ this.addresses = Collections.emptyList();
+ this.addressIndex = -1;
this.authenticationException = null;
this.lastConnectAttemptMs = lastConnectAttempt;
this.failedAttempts = 0;
@@ -374,17 +383,32 @@ final class ClusterConnectionStates {
return host;
}
- public InetAddress currentAddress() {
- return addresses.get(index);
+ /**
+ * Fetches the current selected IP address for this node, resolving {@link #host()} if necessary.
+ * @return the selected address
+ * @throws UnknownHostException if resolving {@link #host()} fails
+ */
+ private InetAddress currentAddress() throws UnknownHostException {
+ if (addresses.isEmpty()) {
+ // (Re-)initialize list
+ addresses = ClientUtils.resolve(host, clientDnsLookup);
+ addressIndex = 0;
+ }
+
+ return addresses.get(addressIndex);
}
- /*
- * implementing a ring buffer with the addresses
+ /**
+ * Jumps to the next available resolved address for this node. If no other addresses are available, marks the
+ * list to be refreshed on the next {@link #currentAddress()} call.
*/
- public void moveToNextAddress() throws UnknownHostException {
- index = (index + 1) % addresses.size();
- if (index == 0)
- addresses = ClientUtils.resolve(host, clientDnsLookup);
+ private void moveToNextAddress() {
+ if (addresses.isEmpty())
+ return; // Avoid div0. List will initialize on next currentAddress() call
+
+ addressIndex = (addressIndex + 1) % addresses.size();
+ if (addressIndex == 0)
+ addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}
public String toString() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0c5230d..44446b3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -697,7 +697,7 @@ public class NetworkClient implements KafkaClient {
* @param responses The list of responses to update
* @param nodeId Id of the node to be disconnected
* @param now The current time
- * @param disconnectState The state of the disconnected channel
+ * @param disconnectState The state of the disconnected channel
*/
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
@@ -910,23 +910,25 @@ public class NetworkClient implements KafkaClient {
/**
* Initiate a connection to the given node
+ * @param node the node to connect to
+ * @param now current time in epoch milliseconds
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
- this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
- InetAddress address = this.connectionStates.currentAddress(nodeConnectionId);
+ connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
+ InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
+ log.warn("Error connecting to node {}", node, e);
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
- log.warn("Error connecting to node {}", node, e);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 79afb75..19b701d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -53,7 +53,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testClusterConnectionStateChanges() throws UnknownHostException {
+ public void testClusterConnectionStateChanges() {
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
// Start connecting to Node and check state
@@ -97,7 +97,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testMultipleNodeConnectionStates() throws UnknownHostException {
+ public void testMultipleNodeConnectionStates() {
// Check initial state, allowed to connect to all nodes, but no nodes shown as ready
assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds()));
@@ -135,7 +135,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testAuthorizationFailed() throws UnknownHostException {
+ public void testAuthorizationFailed() {
// Try connecting
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
@@ -156,7 +156,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testRemoveNode() throws UnknownHostException {
+ public void testRemoveNode() {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
connectionStates.ready(nodeId1);
@@ -171,7 +171,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testMaxReconnectBackoff() throws UnknownHostException {
+ public void testMaxReconnectBackoff() {
long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter));
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
@@ -191,7 +191,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testExponentialReconnectBackoff() throws UnknownHostException {
+ public void testExponentialReconnectBackoff() {
// Calculate fixed components for backoff process
final int reconnectBackoffExpBase = 2;
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
@@ -211,7 +211,7 @@ public class ClusterConnectionStatesTest {
}
@Test
- public void testThrottled() throws UnknownHostException {
+ public void testThrottled() {
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
time.sleep(1000);
connectionStates.ready(nodeId1);
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 5a4e1e8..e098236 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -82,7 +82,7 @@ public class NetworkClientTest {
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
- 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
+ 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
}
@@ -117,6 +117,12 @@ public class NetworkClientTest {
}
@Test
+ public void testDnsLookupFailure() {
+ /* Fail cleanly when the node has a bad hostname */
+ assertFalse(client.ready(new Node(1234, "badhost", 1234), time.milliseconds()));
+ }
+
+ @Test
public void testClose() {
client.ready(node, time.milliseconds());
awaitReady(client, node);