You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/16 08:35:25 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r440658796



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
             } else if (connectionStates.isPreparingConnection(node.idString())) {
                 foundConnecting = node;
             } else if (canConnect(node, now)) {
-                foundCanConnect = node;
+                if (foundCanConnect == null ||
+                        this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+                                this.connectionStates.lastConnectAttemptMs(node.idString())) {
+                    foundCanConnect = node;
+                }

Review comment:
       It would be great if you could update the javadoc of the method to reflect this.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Are you sure that using `lastConnectAttemptMs` is correct here? `lastConnectAttemptMs` is recorded when a connection is disconnected and as we respect the `reconnectBackoffMs` before reconnecting, the connection timeout computed here will also include the current `reconnectBackoffMs`. Is this what we want? It may be better to record the time in `connecting`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";

Review comment:
       nit: I would use `to be established` instead of `to be built` and I think that we should avoid putting default values in the documentation here because first the default can be changed on a per client basis and second the default will be documented based on the default value provided in config definition.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       nit: What about using `retries` instead of `n`? It may help to understand that the exponential value is computed based on the number of tries or retries.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,29 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeoutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {

Review comment:
       nit: We usually put a space before and after the `:`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       Should we ensure that `nodeState` is not `null` here?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long now) {
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       Shouldn't we increment before computing the new reconnect backoff?

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be built. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. The default value will be 127 seconds.";

Review comment:
       Same comment as above.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+    @Test
+    public void testGeometricProgression() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        GeometricProgression geometricProgression = new GeometricProgression(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            System.out.println(geometricProgression.term(5));

Review comment:
       This one can be removed I suppose.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       I would go with `ExponentialBackoff` even if we use it for computing an exponential timeout as well. I think that people will understand this.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       While I also recognize that we are not consistent with this, I would do it as suggested by Rajini. The defaults are the same everywhere so it makes sense to have it defined centrally for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org