You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/12 18:50:12 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439579612



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
-    private final double reconnectBackoffMaxExp;
     private final Map<String, NodeConnectionState> nodeState;
     private final Logger log;
+    private Set<String> connectingNodes;
+    private GeometricProgression reconnectBackoff;
+    private GeometricProgression connectionSetupTimeout;
 
-    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
+    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
+                                   long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
+                                   LogContext logContext) {
         this.log = logContext.logger(ClusterConnectionStates.class);
-        this.reconnectBackoffInitMs = reconnectBackoffMs;
-        this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
-        this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
+        this.reconnectBackoff = new GeometricProgression(
+                reconnectBackoffMs, 2, reconnectBackoffMaxMs, 0.2);

Review comment:
       Just to be sure, we are reusing the code, but not changing the behaviour for backoff right?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -554,6 +571,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
         handleDisconnections(responses, updatedNow);
         handleConnections();
         handleInitiateApiVersionRequests(updatedNow);
+        handleTimeOutConnections(responses, updatedNow);

Review comment:
       `TimeOut` => `TimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -296,36 +314,6 @@ public AuthenticationException authenticationException(String id) {
         return state != null ? state.authenticationException : null;
     }
 
-    /**
-     * Resets the failure count for a node and sets the reconnect backoff to the base
-     * value configured via reconnect.backoff.ms
-     *
-     * @param nodeState The node state object to update
-     */
-    private void resetReconnectBackoff(NodeConnectionState nodeState) {

Review comment:
       Can't we keep this method to perform reset (perhaps rename the method) and include all types of reset?

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       This is not exactly a geometric progression?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node

Review comment:
       comment unnecessary since it is obvious from the line below

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;

Review comment:
       why did we remove the constance and hard-code the number inline?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +165,16 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        nodeState.failedAttempts++;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts - 1);
+        if (nodeState.state == ConnectionState.CONNECTING) {
+            nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(nodeState.failedAttempts);

Review comment:
       `failedAttempts` isn't really the number of failed connections, so not sure what we are setting timeout to.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG = "socket.connections.setup.timeout.ms";

Review comment:
       Didn't we decide to call this `socket.connection.setup.timeout.ms` during KIP discussion? (connection instead of connections)

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Shouldn't we also check that the node is in connecting state?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node
+                this.selector.close(nodeId);
+                log.debug("Disconnecting from node {} due to socket connection setup timeout.", nodeId);

Review comment:
       Can we include the timeout in the log line?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       Can we define the defaults in `CommonClientConfigs`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org