You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/23 15:14:49 UTC
kafka git commit: KAFKA-6101;
Reconnecting to broker does not exponentially backoff
Repository: kafka
Updated Branches:
refs/heads/trunk 6fc573225 -> 277fc927c
KAFKA-6101; Reconnecting to broker does not exponentially backoff
Author: tedyu <yu...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Soenke Liebau <so...@opencore.com>, Ismael Juma <is...@juma.me.uk>
Closes #4118 from tedyu/trunk
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/277fc927
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/277fc927
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/277fc927
Branch: refs/heads/trunk
Commit: 277fc927c0d6c2699b64a5a54461fb96a50f58ae
Parents: 6fc5732
Author: tedyu <yu...@gmail.com>
Authored: Mon Oct 23 15:52:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Oct 23 15:52:30 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/ClusterConnectionStates.java | 11 ++-
.../clients/ClusterConnectionStatesTest.java | 20 ++++++
.../apache/kafka/clients/NetworkClientTest.java | 76 +++++++++++++++++---
3 files changed, 96 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 0055843..89e958d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -106,7 +106,14 @@ final class ClusterConnectionStates {
* @param now the current time
*/
public void connecting(String id, long now) {
- nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs));
+ if (nodeState.containsKey(id)) {
+ NodeConnectionState node = nodeState.get(id);
+ node.lastConnectAttemptMs = now;
+ node.state = ConnectionState.CONNECTING;
+ } else {
+ nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
+ this.reconnectBackoffInitMs));
+ }
}
/**
@@ -274,7 +281,7 @@ final class ClusterConnectionStates {
}
public String toString() {
- return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
+ return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ")";
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 21b8719..3b246f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -178,4 +178,24 @@ public class ClusterConnectionStatesTest {
connectionStates.disconnected(nodeId1, time.milliseconds());
}
}
+
+ @Test
+ public void testExponentialReconnectBackoff() {
+ // Calculate fixed components for backoff process
+ final int reconnectBackoffExpBase = 2;
+ double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
+ / Math.log(reconnectBackoffExpBase);
+
+ // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
+ for (int i = 0; i < 10; i++) {
+ connectionStates.connecting(nodeId1, time.milliseconds());
+ connectionStates.disconnected(nodeId1, time.milliseconds());
+ // Calculate expected backoff value without jitter
+ long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
+ * reconnectBackoffMs);
+ long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
+ assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
+ time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index ff0f8f2..edbd72d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -54,17 +54,17 @@ public class NetworkClientTest {
protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
protected final Node node = cluster.nodes().get(0);
protected final long reconnectBackoffMsTest = 10 * 1000;
- protected final long reconnectBackoffMaxTest = 10 * 1000;
- protected final NetworkClient client = createNetworkClient();
+ protected final long reconnectBackoffMaxMsTest = 10 * 10000;
+ private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
+ private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
-
private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
- private NetworkClient createNetworkClient() {
+ private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
- reconnectBackoffMsTest, reconnectBackoffMaxTest,
- 64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+ reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
+ requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
}
private NetworkClient createNetworkClientWithStaticNodes() {
@@ -75,7 +75,7 @@ public class NetworkClientTest {
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
- reconnectBackoffMsTest, reconnectBackoffMaxTest,
+ reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions(), new LogContext());
}
@@ -217,6 +217,47 @@ public class NetworkClientTest {
}
@Test
+ public void testConnectionDelayWithNoExponentialBackoff() {
+ long now = time.milliseconds();
+ long delay = clientWithNoExponentialBackoff.connectionDelay(node, now);
+
+ assertEquals(0, delay);
+ }
+
+ @Test
+ public void testConnectionDelayConnectedWithNoExponentialBackoff() {
+ awaitReady(clientWithNoExponentialBackoff, node);
+
+ long now = time.milliseconds();
+ long delay = clientWithNoExponentialBackoff.connectionDelay(node, now);
+
+ assertEquals(Long.MAX_VALUE, delay);
+ }
+
+ @Test
+ public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
+ awaitReady(clientWithNoExponentialBackoff, node);
+
+ selector.close(node.idString());
+ clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds());
+ long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds());
+
+ assertEquals(reconnectBackoffMsTest, delay);
+
+ // Sleep until there is no connection delay
+ time.sleep(delay);
+ assertEquals(0, clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds()));
+
+ // Start connecting and disconnect before the connection is established
+ client.ready(node, time.milliseconds());
+ selector.close(node.idString());
+ client.poll(requestTimeoutMs, time.milliseconds());
+
+ // Second attempt should have the same behaviour as exponential backoff is disabled
+ assertEquals(reconnectBackoffMsTest, delay);
+ }
+
+ @Test
public void testConnectionDelay() {
long now = time.milliseconds();
long delay = client.connectionDelay(node, now);
@@ -238,11 +279,28 @@ public class NetworkClientTest {
public void testConnectionDelayDisconnected() {
awaitReady(client, node);
+ // First disconnection
selector.close(node.idString());
client.poll(requestTimeoutMs, time.milliseconds());
long delay = client.connectionDelay(node, time.milliseconds());
+ long expectedDelay = reconnectBackoffMsTest;
+ double jitter = 0.2;
+ assertEquals(expectedDelay, delay, expectedDelay * jitter);
- assertEquals(reconnectBackoffMsTest, delay);
+ // Sleep until there is no connection delay
+ time.sleep(delay);
+ assertEquals(0, client.connectionDelay(node, time.milliseconds()));
+
+ // Start connecting and disconnect before the connection is established
+ client.ready(node, time.milliseconds());
+ selector.close(node.idString());
+ client.poll(requestTimeoutMs, time.milliseconds());
+
+ // Second attempt should take twice as long with twice the jitter
+ expectedDelay = Math.round(delay * 2);
+ delay = client.connectionDelay(node, time.milliseconds());
+ jitter = 0.4;
+ assertEquals(expectedDelay, delay, expectedDelay * jitter);
}
@Test
@@ -281,7 +339,7 @@ public class NetworkClientTest {
assertFalse(client.canConnect(node, time.milliseconds()));
// ensure disconnect does not reset blackout period if already disconnected
- time.sleep(reconnectBackoffMsTest);
+ time.sleep(reconnectBackoffMaxMsTest);
assertTrue(client.canConnect(node, time.milliseconds()));
client.disconnect(node.idString());
assertTrue(client.canConnect(node, time.milliseconds()));