You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/23 15:14:49 UTC

kafka git commit: KAFKA-6101; Reconnecting to broker does not exponentially backoff

Repository: kafka
Updated Branches:
  refs/heads/trunk 6fc573225 -> 277fc927c


KAFKA-6101; Reconnecting to broker does not exponentially backoff

Author: tedyu <yu...@gmail.com>
Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Soenke Liebau <so...@opencore.com>, Ismael Juma <is...@juma.me.uk>

Closes #4118 from tedyu/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/277fc927
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/277fc927
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/277fc927

Branch: refs/heads/trunk
Commit: 277fc927c0d6c2699b64a5a54461fb96a50f58ae
Parents: 6fc5732
Author: tedyu <yu...@gmail.com>
Authored: Mon Oct 23 15:52:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Oct 23 15:52:30 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  | 11 ++-
 .../clients/ClusterConnectionStatesTest.java    | 20 ++++++
 .../apache/kafka/clients/NetworkClientTest.java | 76 +++++++++++++++++---
 3 files changed, 96 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 0055843..89e958d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -106,7 +106,14 @@ final class ClusterConnectionStates {
      * @param now the current time
      */
     public void connecting(String id, long now) {
-        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs));
+        if (nodeState.containsKey(id)) {
+            NodeConnectionState node = nodeState.get(id);
+            node.lastConnectAttemptMs = now;
+            node.state = ConnectionState.CONNECTING;
+        } else {
+            nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
+                this.reconnectBackoffInitMs));
+        }
     }
 
     /**
@@ -274,7 +281,7 @@ final class ClusterConnectionStates {
         }
 
         public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
+            return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ")";
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 21b8719..3b246f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -178,4 +178,24 @@ public class ClusterConnectionStatesTest {
             connectionStates.disconnected(nodeId1, time.milliseconds());
         }
     }
+
+    @Test
+    public void testExponentialReconnectBackoff() {
+        // Calculate fixed components for backoff process
+        final int reconnectBackoffExpBase = 2;
+        double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
+            / Math.log(reconnectBackoffExpBase);
+
+        // Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
+        for (int i = 0; i < 10; i++) {
+            connectionStates.connecting(nodeId1, time.milliseconds());
+            connectionStates.disconnected(nodeId1, time.milliseconds());
+            // Calculate expected backoff value without jitter
+            long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
+                * reconnectBackoffMs);
+            long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
+            assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
+            time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/277fc927/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index ff0f8f2..edbd72d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -54,17 +54,17 @@ public class NetworkClientTest {
     protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     protected final Node node = cluster.nodes().get(0);
     protected final long reconnectBackoffMsTest = 10 * 1000;
-    protected final long reconnectBackoffMaxTest = 10 * 1000;
-    protected final NetworkClient client = createNetworkClient();
+    protected final long reconnectBackoffMaxMsTest = 10 * 10000;
 
+    private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
+    private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
     private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes();
-
     private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery();
 
-    private NetworkClient createNetworkClient() {
+    private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                reconnectBackoffMsTest, reconnectBackoffMaxTest,
-                64 * 1024, 64 * 1024, requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
+                reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
+                requestTimeoutMs, time, true, new ApiVersions(), new LogContext());
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
@@ -75,7 +75,7 @@ public class NetworkClientTest {
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
-                reconnectBackoffMsTest, reconnectBackoffMaxTest,
+                reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
                 64 * 1024, 64 * 1024, requestTimeoutMs, time, false, new ApiVersions(), new LogContext());
     }
 
@@ -217,6 +217,47 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testConnectionDelayWithNoExponentialBackoff() {
+        long now = time.milliseconds();
+        long delay = clientWithNoExponentialBackoff.connectionDelay(node, now);
+
+        assertEquals(0, delay);
+    }
+
+    @Test
+    public void testConnectionDelayConnectedWithNoExponentialBackoff() {
+        awaitReady(clientWithNoExponentialBackoff, node);
+
+        long now = time.milliseconds();
+        long delay = clientWithNoExponentialBackoff.connectionDelay(node, now);
+
+        assertEquals(Long.MAX_VALUE, delay);
+    }
+
+    @Test
+    public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
+        awaitReady(clientWithNoExponentialBackoff, node);
+
+        selector.close(node.idString());
+        clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds());
+        long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds());
+
+        assertEquals(reconnectBackoffMsTest, delay);
+
+        // Sleep until there is no connection delay
+        time.sleep(delay);
+        assertEquals(0, clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds()));
+
+        // Start connecting and disconnect before the connection is established
+        client.ready(node, time.milliseconds());
+        selector.close(node.idString());
+        client.poll(requestTimeoutMs, time.milliseconds());
+
+        // Second attempt should have the same behaviour as exponential backoff is disabled
+        assertEquals(reconnectBackoffMsTest, delay);
+    }
+
+    @Test
     public void testConnectionDelay() {
         long now = time.milliseconds();
         long delay = client.connectionDelay(node, now);
@@ -238,11 +279,28 @@ public class NetworkClientTest {
     public void testConnectionDelayDisconnected() {
         awaitReady(client, node);
 
+        // First disconnection
         selector.close(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
         long delay = client.connectionDelay(node, time.milliseconds());
+        long expectedDelay = reconnectBackoffMsTest;
+        double jitter = 0.2;
+        assertEquals(expectedDelay, delay, expectedDelay * jitter);
 
-        assertEquals(reconnectBackoffMsTest, delay);
+        // Sleep until there is no connection delay
+        time.sleep(delay);
+        assertEquals(0, client.connectionDelay(node, time.milliseconds()));
+
+        // Start connecting and disconnect before the connection is established
+        client.ready(node, time.milliseconds());
+        selector.close(node.idString());
+        client.poll(requestTimeoutMs, time.milliseconds());
+
+        // Second attempt should take twice as long with twice the jitter
+        expectedDelay = Math.round(delay * 2);
+        delay = client.connectionDelay(node, time.milliseconds());
+        jitter = 0.4;
+        assertEquals(expectedDelay, delay, expectedDelay * jitter);
     }
 
     @Test
@@ -281,7 +339,7 @@ public class NetworkClientTest {
         assertFalse(client.canConnect(node, time.milliseconds()));
 
         // ensure disconnect does not reset blackout period if already disconnected
-        time.sleep(reconnectBackoffMsTest);
+        time.sleep(reconnectBackoffMaxMsTest);
         assertTrue(client.canConnect(node, time.milliseconds()));
         client.disconnect(node.idString());
         assertTrue(client.canConnect(node, time.milliseconds()));