You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/18 01:42:00 UTC

[GitHub] [kafka] d8tltanc opened a new pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

d8tltanc opened a new pull request #8683:
URL: https://github.com/apache/kafka/pull/8683


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443888648



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {

Review comment:
       I was using the naming for geometric sequences on Wikipedia. I think your suggestion also make sense. @rajinisivaram Do you think we can use `initialInterval`, `multiplier`, and `maxInterval`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445054383



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -40,7 +40,12 @@
     private final MockTime time = new MockTime();
     private final long reconnectBackoffMs = 10 * 1000;
     private final long reconnectBackoffMax = 60 * 1000;
-    private final double reconnectBackoffJitter = 0.2;
+    private final long connectionSetupTimeoutMs = 10 * 1000;
+    private final long connectionSetupTimeoutMaxMs = 127 * 1000;
+    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    private final static double RECONNECT_BACKOFF_JITTER = 0.2;
+    private final static int CONNECTION_SETUP_TIMEOUT_EXP_BASE = 2;
+    private final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;

Review comment:
       Make sense. I've made them package-private and lower-cased the constants.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-649113870


   @rajinisivaram @dajac The test failures are caused by the connection state transition from `CONNECTING` to `CHECKING_API_VERSIONS`, and then to `CONNECTED`, instead of to `CONNECTED` directly. In this case, I should remove the node from the `connecting` HashSet when this transfer happens. I've fixed this issue and update the patch.
   
   Also, I've addressed the latest comments. Please let me know if you have more suggestions and if we can re-run the Jenkins tests. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443903361



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+            assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+            assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+                    connectionStates.connectionSetupTimeoutMs(nodeId1),
+                    connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);
+            connectionStates.disconnected(nodeId1, time.milliseconds());
+            assertFalse(connectionStates.connectingNodes().contains(nodeId1));
+        }
+
+        // Check the timeout value upper bound
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        assertEquals(connectionStates.connectionSetupTimeoutMs(nodeId1), connectionSetupTimeoutMaxMs, connectionSetupTimeoutMaxMs * 0.2);

Review comment:
       Good catch. The `expected` should be what you suggested. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443794598



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be established. If the connection is not built before the timeout elapses the network client will close the socket channel.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value.";

Review comment:
       Yes. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-651250337


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       Good idea. Will go for `ExponentialBackoff`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       When `NetworkClient` initializes a connection to a given node (`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will get initialized and won't be `null`. I think it's probably not reasonable if the caller wants to get the connection timeout of a given node before the connection initialization, which is the reason I prevent this kind of calling by throwing the exception.
   
   However, it might be reasonable for a caller to get the `lastConnectAttemptMs` before initializing the connection. For example, the node provider wants to provide a node with the least recent connection attempt. For those nodes haven't been connected yet, their `NodeConnectionState` does not exist. However, this implies that the node has the highest priority and we may assume their `lastConnectAttemptMs` is 0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-649901859


   @rajinisivaram @dajac I've fixed the tests in the connect repo. It's indeed caused by I didn't add the new configs to connect. Please let me know if we can re-trigger the tests. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439632475



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG = "socket.connections.setup.timeout.ms";

Review comment:
       I think in the kip I used `socket.connections.setup.timeout.ms`
   Since @cmccabe also suggests `socket.connection.setup.timeout.ms`, I'll change the KIP proposal to stick to `socket.connection.setup.timeout.ms`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660



##########
File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+    @Test
+    public void testGeometricProgression() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        GeometricProgression geometricProgression = new GeometricProgression(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            System.out.println(geometricProgression.term(5));

Review comment:
       Oh, right. I missed removing it.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+    @Test
+    public void testGeometricProgression() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        GeometricProgression geometricProgression = new GeometricProgression(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            System.out.println(geometricProgression.term(5));

Review comment:
       Oh, right. I forgot removing it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       When `NetworkClient` initializes a connection to a given node (`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will get initialized and won't be `null`. I think it's probably not reasonable if the caller wants to get the connection timeout of a given node before the connection initialization, which is the reason I prevent this kind of calling by throwing the exception.
   
   However, it might be reasonable for a caller to get the `lastConnectAttemptMs` before initializing the connection. For example, the node provider wants to provide a node with the least recent connection attempt. For those nodes haven't been connected, their `NodeConnectionState` does not exist. However, this implies that the node has the highest priority and we may assume their `lastConnectAttemptMs` is 0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439751792



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       The naming is kind of hard because exponential reconnect backoff, exponential retry backoff, and exponential timeout are sharing this util class at the same time. Any suggestion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-649454533


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443774588



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +399,38 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {

Review comment:
       Yes. Added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445048384



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,14 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 1000L;

Review comment:
       Sure. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r450706026



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +809,29 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimedOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId : connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                this.selector.close(nodeId);
+                log.debug(
+                    "Disconnecting from node {} due to socket connection setup timeout. " +
+                    "The timeout value is {} ms.",
+                    nodeId,
+                    connectionStates.connectionSetupTimeoutMs(nodeId));
+                processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);

Review comment:
       @guozhangwang Thanks for reporting the exception in this code.
   @d8tltanc @dajac This code segment is unsafe, we are removing `node` from `connectingNodes` in p`processDisconnection()` while iterating over the set. We must be missing a test too (or we have a test with only one connection).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439751751



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       I think the current practice is to hard code the defaults in each client's config definition. Do we still want to define the defaults in CommonClientConfigs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443770204



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +325,47 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update

Review comment:
       Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443806341



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -185,4 +191,9 @@ public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
                 CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
                 ClientDnsLookup.USE_ALL_DNS_IPS);
     }
+
+    public class Defaults {
+        public static final long SocketConnectionSetupTimeoutMs = 10 * 1000;
+        public static final long SocketConnectionSetupTimeoutMaxMs = 127 * 1000;

Review comment:
       I see. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442808154



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        incrementReconnectBackoff(nodeState);

Review comment:
       the attempt number is incremented, but backoff value is not exactly incremented. I think we should leave it as `updateReconnectBackoff`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        incrementReconnectBackoff(nodeState);
+        if (nodeState.state == ConnectionState.CONNECTING) {
+            incrementConnectionSetupTimeout(nodeState);

Review comment:
       `update` instead of `increment` here too?

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be established. If the connection is not built before the timeout elapses the network client will close the socket channel.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value.";

Review comment:
       `initial socket connection ` - not initial?

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -185,4 +191,9 @@ public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
                 CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
                 ClientDnsLookup.USE_ALL_DNS_IPS);
     }
+
+    public class Defaults {
+        public static final long SocketConnectionSetupTimeoutMs = 10 * 1000;
+        public static final long SocketConnectionSetupTimeoutMaxMs = 127 * 1000;

Review comment:
       In KafkaConfigs alone, we separated the defaults. For this one, we should do as @dajac suggested above (similar one is DEFAULT_SECURITY_PROTOCOL).

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       `term` is non-intuitive. `backoff` doesn't quite fit for connection timeout, but I guess it fits with `ExponentialBackoff`. So unless there is a better suggestion, `backoff` seems reasonable.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -771,7 +794,7 @@ private void processDisconnection(List<ClientResponse> responses,
      * @param responses The list of responses to update
      * @param now The current time
      */
-    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
+    private void handleTimeoutRequests(List<ClientResponse> responses, long now) {

Review comment:
       I agree, the previous name was better

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +325,47 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update

Review comment:
       `nodeState` added twice?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445062773



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,14 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 1000L;
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DEFAULT = 127 * 1000L;

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-642135591


   @dajac Hi David. Yes, this patch is ready for review. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911638



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is
+ * Backoff(attempts) = random(1 - jitter, 1 + jitter) * initialInterval * multiplier ^ attempts
+ * If scaleFactor is greater or equal than termMax, a constant backoff of will be provided

Review comment:
       Replaced.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442725547



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +399,38 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {

Review comment:
       Can we add a javadoc for this method?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Oh right. I missed the ones in `connecting`. Thanks for the clarification.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...

Review comment:
       nit: I would rephrase this a bit and also explain briefly how we use it in AK (e.g. connect timeout, reconnection backoff, etc.). Also, I would suggest to terminate phrases with `.`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long now) {
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       That makes sense. Thanks for the clarification.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       I would rather prefer to handle this like we did in `lastConnectAttemptMs` in order to remain consistent. If `nodeState` is `null`, we can return `0`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -185,4 +191,9 @@ public static void warnIfDeprecatedDnsLookupValue(AbstractConfig config) {
                 CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
                 ClientDnsLookup.USE_ALL_DNS_IPS);
     }
+
+    public class Defaults {
+        public static final long SocketConnectionSetupTimeoutMs = 10 * 1000;
+        public static final long SocketConnectionSetupTimeoutMaxMs = 127 * 1000;

Review comment:
       We usually defined constants with capital letters and underscores to separate words. Moreover, we usually put defaults next to the config and the doc. We could use the following:
   - `SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT`; and
   - `SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DEFAULT`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       That is indeed true today as the caller only calls with nodes in `connectingNodes` but that may not be true forever. I would add the check as suggested by Rajini here to make the implementation safe.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {

Review comment:
       I find the terminology used here not intuitive. What about using something like: `initialInterval`, `multiplier` and `maxInterval`? I think these are more common when it comes to configuring backoff.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       nit: Let me try with another suggestion ;) What about `backoff(long attempts)`?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -88,6 +88,8 @@ object Defaults {
   val MaxConnections: Int = Int.MaxValue
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
   val RequestTimeoutMs = 30000
+  val ConnectionSetupTimeoutMs = 10 * 1000L
+  val ConnectionSetupTimeoutMaxMs = 127 * 1000L

Review comment:
       We can use the ones defined in `CommonClientConfigs` here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -771,7 +794,7 @@ private void processDisconnection(List<ClientResponse> responses,
      * @param responses The list of responses to update
      * @param now The current time
      */
-    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
+    private void handleTimeoutRequests(List<ClientResponse> responses, long now) {

Review comment:
       nit: I personally prefer the previous name. I would also rename `handleTimeoutConnections` to `handleTimedOutConnections`.

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {

Review comment:
       nit: Could we define a local constant for the base `2` and reuse it everywhere?

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+            assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+            assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+                    connectionStates.connectionSetupTimeoutMs(nodeId1),
+                    connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);
+            connectionStates.disconnected(nodeId1, time.milliseconds());
+            assertFalse(connectionStates.connectingNodes().contains(nodeId1));
+        }
+
+        // Check the timeout value upper bound
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        assertEquals(connectionStates.connectionSetupTimeoutMs(nodeId1), connectionSetupTimeoutMaxMs, connectionSetupTimeoutMaxMs * 0.2);

Review comment:
       nit: I would put `connectionSetupTimeoutMaxMs` first as it is the expected value. The same applies below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+            assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+            assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+                    connectionStates.connectionSetupTimeoutMs(nodeId1),
+                    connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);

Review comment:
       nit: For the jitter, what about defining a constant like `reconnectBackoffJitter`?

##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
     val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
     val DefaultRequestTimeoutMs = 5000
+    val DefaultSocketConnectionSetupMs = 10 * 1000
+    val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
       It seems that we can remove these two and use the ones in `CommonClientConfigs`.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExponentialBackoffTest {
+    @Test
+    public void testExponentialBackoff() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), exponentialBackoff.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            assertTrue(exponentialBackoff.term(1000) <= termMax * (1 + jitter));

Review comment:
       I think that it would be good to test with `n=5` as well to ensure that `termMax` is already applied from it. Perhaps, we could do the following:
   ```
   for (int n = 0; n <= 100; n++) {
     if (n <= 4)
       assertEquals(scaleFactor * Math.pow(ratio, n), exponentialBackoff.term(n),
          scaleFactor * Math.pow(ratio, n) * jitter);
     else
       assertTrue(exponentialBackoff.term(1000) <= termMax * (1 + jitter));
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443903133



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {
+            connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+            assertTrue(connectionStates.connectingNodes().contains(nodeId1));
+            assertEquals(connectionSetupTimeoutMs * Math.pow(2, n),
+                    connectionStates.connectionSetupTimeoutMs(nodeId1),
+                    connectionSetupTimeoutMs * Math.pow(2, n) * 0.2);

Review comment:
       Yes. I added static class properties for specifying these parameters.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439646696



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       I think we don't need to check if the node is in connecting state since the caller is only applying this test to all the nodes in the connecting state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445042974



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       Sure. Now `isConnectionSetupTimeout` is also using this checker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms.
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? 

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms.
   
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   
   So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-651250018


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-642024625


   @d8tltanc Is this one ready for being reviewed? I can help doing it if it is.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-651402845


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445008715



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -40,7 +40,12 @@
     private final MockTime time = new MockTime();
     private final long reconnectBackoffMs = 10 * 1000;
     private final long reconnectBackoffMax = 60 * 1000;
-    private final double reconnectBackoffJitter = 0.2;
+    private final long connectionSetupTimeoutMs = 10 * 1000;
+    private final long connectionSetupTimeoutMaxMs = 127 * 1000;
+    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    private final static double RECONNECT_BACKOFF_JITTER = 0.2;
+    private final static int CONNECTION_SETUP_TIMEOUT_EXP_BASE = 2;
+    private final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;

Review comment:
       We can just make the constants in `ClusterConnectionStates` package-private and reuse them here?

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {

Review comment:
       Sorry, missed this earlier. Yes, I prefer `initialInterval`, `multiplier`, and `maxInterval` too since they fit better with  ExponentialBackoff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439611544



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
-    private final double reconnectBackoffMaxExp;
     private final Map<String, NodeConnectionState> nodeState;
     private final Logger log;
+    private Set<String> connectingNodes;
+    private GeometricProgression reconnectBackoff;
+    private GeometricProgression connectionSetupTimeout;
 
-    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
+    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
+                                   long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
+                                   LogContext logContext) {
         this.log = logContext.logger(ClusterConnectionStates.class);
-        this.reconnectBackoffInitMs = reconnectBackoffMs;
-        this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
-        this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
+        this.reconnectBackoff = new GeometricProgression(
+                reconnectBackoffMs, 2, reconnectBackoffMaxMs, 0.2);

Review comment:
       Yes, we are reusing the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-649518932


   @d8tltanc Looks like we haven't add the new configs to Connect: https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3103/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439579612



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
-    private final double reconnectBackoffMaxExp;
     private final Map<String, NodeConnectionState> nodeState;
     private final Logger log;
+    private Set<String> connectingNodes;
+    private GeometricProgression reconnectBackoff;
+    private GeometricProgression connectionSetupTimeout;
 
-    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
+    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
+                                   long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
+                                   LogContext logContext) {
         this.log = logContext.logger(ClusterConnectionStates.class);
-        this.reconnectBackoffInitMs = reconnectBackoffMs;
-        this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
-        this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
+        this.reconnectBackoff = new GeometricProgression(
+                reconnectBackoffMs, 2, reconnectBackoffMaxMs, 0.2);

Review comment:
       Just to be sure, we are reusing the code, but not changing the behaviour for backoff right?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -554,6 +571,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
         handleDisconnections(responses, updatedNow);
         handleConnections();
         handleInitiateApiVersionRequests(updatedNow);
+        handleTimeOutConnections(responses, updatedNow);

Review comment:
       `TimeOut` => `TimedOut`

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -296,36 +314,6 @@ public AuthenticationException authenticationException(String id) {
         return state != null ? state.authenticationException : null;
     }
 
-    /**
-     * Resets the failure count for a node and sets the reconnect backoff to the base
-     * value configured via reconnect.backoff.ms
-     *
-     * @param nodeState The node state object to update
-     */
-    private void resetReconnectBackoff(NodeConnectionState nodeState) {

Review comment:
       Can't we keep this method to perform reset (perhaps rename the method) and include all types of reset?

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       This is not exactly a geometric progression?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node

Review comment:
       comment unnecessary since it is obvious from the line below

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;

Review comment:
       why did we remove the constance and hard-code the number inline?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +165,16 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        nodeState.failedAttempts++;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts - 1);
+        if (nodeState.state == ConnectionState.CONNECTING) {
+            nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(nodeState.failedAttempts);

Review comment:
       `failedAttempts` isn't really the number of failed connections, so not sure what we are setting timeout to.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG = "socket.connections.setup.timeout.ms";

Review comment:
       Didn't we decide to call this `socket.connection.setup.timeout.ms` during KIP discussion? (connection instead of connections)

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Shouldn't we also check that the node is in connecting state?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node
+                this.selector.close(nodeId);
+                log.debug("Disconnecting from node {} due to socket connection setup timeout.", nodeId);

Review comment:
       Can we include the timeout in the log line?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       Can we define the defaults in `CommonClientConfigs`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439756327



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -554,6 +571,7 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
         handleDisconnections(responses, updatedNow);
         handleConnections();
         handleInitiateApiVersionRequests(updatedNow);
+        handleTimeOutConnections(responses, updatedNow);

Review comment:
       Do you mean TimeOut => Timeout? Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443935192



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -88,6 +88,8 @@ object Defaults {
   val MaxConnections: Int = Int.MaxValue
   val ConnectionsMaxIdleMs = 10 * 60 * 1000L
   val RequestTimeoutMs = 30000
+  val ConnectionSetupTimeoutMs = 10 * 1000L
+  val ConnectionSetupTimeoutMaxMs = 127 * 1000L

Review comment:
       Yes. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";

Review comment:
       Make sense. I'll change the description and remove the defaults in the doc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445654781



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    /**
+     * Get the current socket connection setup timeout of the given node.
+     * The base value is defined via socket.connection.setup.timeout.
+     * @param id the connection to fetch the state for
+     */
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        if (nodeState == null)
+            throw new IllegalStateException("Connection to node " + id + " hasn't been initialized");
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        NodeConnectionState nodeState = this.nodeState.get(id);

Review comment:
       nit: You can also use `this.nodeState(id)` here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    /**
+     * Get the current socket connection setup timeout of the given node.
+     * The base value is defined via socket.connection.setup.timeout.
+     * @param id the connection to fetch the state for
+     */
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        if (nodeState == null)
+            throw new IllegalStateException("Connection to node " + id + " hasn't been initialized");

Review comment:
       nit: This can be replaced by `this.nodeState(id)`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is
+ * Backoff(attempts) = random(1 - jitter, 1 + jitter) * initialInterval * multiplier ^ attempts
+ * If scaleFactor is greater or equal than termMax, a constant backoff of will be provided

Review comment:
       Replace `scaleFactor` and `termMax` with the new terms.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is

Review comment:
       nit: `The formula is` -> `The formula is:`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       I think so. The `lastConnectAttemptMs` is updated in both `connecting` (Line 145 & Line 157) and `disconnected`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r440658796



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
             } else if (connectionStates.isPreparingConnection(node.idString())) {
                 foundConnecting = node;
             } else if (canConnect(node, now)) {
-                foundCanConnect = node;
+                if (foundCanConnect == null ||
+                        this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+                                this.connectionStates.lastConnectAttemptMs(node.idString())) {
+                    foundCanConnect = node;
+                }

Review comment:
       It would be great if you could update the javadoc of the method to reflect this.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Are you sure that using `lastConnectAttemptMs` is correct here? `lastConnectAttemptMs` is recorded when a connection is disconnected and as we respect the `reconnectBackoffMs` before reconnecting, the connection timeout computed here will also include the current `reconnectBackoffMs`. Is this what we want? It may be better to record the time in `connecting`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";

Review comment:
       nit: I would use `to be established` instead of `to be built` and I think that we should avoid putting default values in the documentation here because first the default can be changed on a per client basis and second the default will be documented based on the default value provided in config definition.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       nit: What about using `retries` instead of `n`? It may help to understand that the exponential value is computed based on the number of tries or retries.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,29 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeoutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {

Review comment:
       nit: We usually put a space before and after the `:`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       Should we ensure that `nodeState` is not `null` here?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long now) {
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       Shouldn't we increment before computing the new reconnect backoff?

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be built. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. The default value will be 127 seconds.";

Review comment:
       Same comment as above.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+    @Test
+    public void testGeometricProgression() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        GeometricProgression geometricProgression = new GeometricProgression(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            System.out.println(geometricProgression.term(5));

Review comment:
       This one can be removed I suppose.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       I would go with `ExponentialBackoff` even if we use it for computing an exponential timeout as well. I think that people will understand this.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       While I also recognize that we are not consistent with this, I would do it as suggested by Rajini. The defaults are the same everywhere so it makes sense to have it defined centrally for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443934101



##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
     val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
     val DefaultRequestTimeoutMs = 5000
+    val DefaultSocketConnectionSetupMs = 10 * 1000
+    val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
       Yes. Removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-643656181


   Thanks for the feedback @rajinisivaram 
   I've modified the patch per your suggestions.
   Please take your time and let me know if we can start testing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-646217062


   Thanks, @dajac for the comments. I've modified the PR per your suggestions. 
   @rajinisivaram Do you think we can start testing?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439647104



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node

Review comment:
       Line deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443933957



##########
File path: clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ExponentialBackoffTest {
+    @Test
+    public void testExponentialBackoff() {
+        long scaleFactor = 100;
+        int ratio = 2;
+        long termMax = 2000;
+        double jitter = 0.2;
+        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+                scaleFactor, ratio, termMax, jitter
+        );
+
+        for (int i = 0; i <= 100; i++) {
+            for (int n = 0; n <= 4; n++) {
+                assertEquals(scaleFactor * Math.pow(ratio, n), exponentialBackoff.term(n),
+                        scaleFactor * Math.pow(ratio, n) * jitter);
+            }
+            assertTrue(exponentialBackoff.term(1000) <= termMax * (1 + jitter));

Review comment:
       Thanks for the suggestion. I've adopted this in my test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       When `NetworkClient` initializes a connection to a given node (`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will get initialized and won't be `null`. I think it's probably not reasonable if the caller wants to get the connection timeout of a given node before the connection initialization, which is the reason I prevent this kind of calling by throwing the exception.
   
   However, it might be reasonable for a caller to get the `lastConnectAttemptMs` before initializing the connection to the node. For example, the node provider wants to provide a node with the least recent connection attempts. For those nodes haven't been connected, their `NodeConnectionState` does not exist. However, this implies that the node has the highest priority and we may assume their `lastConnectAttemptMs` is 0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439752541



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -296,36 +314,6 @@ public AuthenticationException authenticationException(String id) {
         return state != null ? state.authenticationException : null;
     }
 
-    /**
-     * Resets the failure count for a node and sets the reconnect backoff to the base
-     * value configured via reconnect.backoff.ms
-     *
-     * @param nodeState The node state object to update
-     */
-    private void resetReconnectBackoff(NodeConnectionState nodeState) {

Review comment:
       Including all types of reset together is probably not a good choice because the reset of failed attempts and the reset of the connection timeout may happen in different places. 
   
   However, I agree we should have some abstraction on the update and reset logic. I'll put the logic in new class methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443970898



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...

Review comment:
       Sure. How does it look like now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Good catch. I'll make the logic record it in both `connecting` and `disconnected`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443785134



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       When `NetworkClient` initializes a connection to a given node (`NetworkClient::initiateConnect`), it's guaranteed that the `nodeState` will get initialized and won't be `null`. I think it's probably not reasonable if the caller wants to get the connection timeout of a given node before the connection initialization, which is the reason I prevent this kind of calling by throwing the exception.
   
   However, it might be reasonable for a caller to get the `lastConnectAttemptMs` before initializing the connection. For example, the node provider wants to provide a node with the least recent connection attempts. For those nodes haven't been connected, their `NodeConnectionState` does not exist. However, this implies that the node has the highest priority and we may assume their `lastConnectAttemptMs` is 0.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-648288498


   @dajac @rajinisivaram Thanks for the second round of review. I've addressed your comments and adopted your suggestions. Please let me know if you have more thoughts on this PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r444951226



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       Thanks for the clarification. In this case, let's reuse the `nodeState` method which check null and throws an `IllegalStateException` as you do here. We may be able to use it in `isConnectionSetupTimeout` as well.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {

Review comment:
       @rajinisivaram What do you think about the proposal?

##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -40,7 +40,12 @@
     private final MockTime time = new MockTime();
     private final long reconnectBackoffMs = 10 * 1000;
     private final long reconnectBackoffMax = 60 * 1000;
-    private final double reconnectBackoffJitter = 0.2;
+    private final long connectionSetupTimeoutMs = 10 * 1000;
+    private final long connectionSetupTimeoutMaxMs = 127 * 1000;
+    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
+    private final static double RECONNECT_BACKOFF_JITTER = 0.2;
+    private final static int CONNECTION_SETUP_TIMEOUT_EXP_BASE = 2;
+    private final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;

Review comment:
       nit: It may be better to keep them as lower case to stay inline with the other constants. Or shall we convert all the others to upper case and to static constant?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911349



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    /**
+     * Get the current socket connection setup timeout of the given node.
+     * The base value is defined via socket.connection.setup.timeout.
+     * @param id the connection to fetch the state for
+     */
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        if (nodeState == null)
+            throw new IllegalStateException("Connection to node " + id + " hasn't been initialized");

Review comment:
       Good catch. Refactored.

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +402,46 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    /**
+     * Get the current socket connection setup timeout of the given node.
+     * The base value is defined via socket.connection.setup.timeout.
+     * @param id the connection to fetch the state for
+     */
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        if (nodeState == null)
+            throw new IllegalStateException("Connection to node " + id + " hasn't been initialized");
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        NodeConnectionState nodeState = this.nodeState.get(id);

Review comment:
       Good catch. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439621170



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -34,19 +36,22 @@
  *
  */
 final class ClusterConnectionStates {
-    private final long reconnectBackoffInitMs;
-    private final long reconnectBackoffMaxMs;
-    private final static int RECONNECT_BACKOFF_EXP_BASE = 2;

Review comment:
       I was thinking that the existing code hardcoded the jitter. I added back the constance and also added a constance for jitter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -149,6 +155,16 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+                                        Type.LONG,
+                                        10 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+                                .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+                                        Type.LONG,
+                                        127 * 1000,

Review comment:
       Sounds good. WIll refactor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443769844



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        incrementReconnectBackoff(nodeState);
+        if (nodeState.state == ConnectionState.CONNECTING) {
+            incrementConnectionSetupTimeout(nodeState);

Review comment:
       Refactored




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439609952



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +165,16 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        nodeState.failedAttempts++;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts - 1);
+        if (nodeState.state == ConnectionState.CONNECTING) {
+            nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(nodeState.failedAttempts);

Review comment:
       Correct. I'll add a new class property to record connection failures separately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439663059



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,26 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeOutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {
+            if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+                // close connection to the node
+                this.selector.close(nodeId);
+                log.debug("Disconnecting from node {} due to socket connection setup timeout.", nodeId);

Review comment:
       Added timeout value to the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;

Review comment:
       No. The caller will ensure that the node is in the connecting state. I'll add an IllegalStateException here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443807553



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -771,7 +794,7 @@ private void processDisconnection(List<ClientResponse> responses,
      * @param responses The list of responses to update
      * @param now The current time
      */
-    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
+    private void handleTimeoutRequests(List<ClientResponse> responses, long now) {

Review comment:
       Sure. Refactored.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-650651194






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443793290



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       Make sense. Checker added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,12 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds.";
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be built. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. The default value will be 127 seconds.";

Review comment:
       Refactored




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-648969079


   @d8tltanc It seems that many tests failed due to an IllegalStateException. It seems that isConnectionTimeout is called while the connection state is not Connecting. Do you have an idea where it comes from?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r441915398



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long now) {
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       reconnectBackoff.term(0) will return the ${reconnect.backoff.ms} * 2 * 0 * jitter
   reconnectBackoff.term(1) will return the ${reconnect.backoff.ms} * 2 * 1 * jitter
   
   The difference btw reconnect backoff and connection timeout here is that, after the first failed attempts, the connection timeout will be the 1st term of the randomized geometric sequence but the reconnect backoff will be the 0st term of the randomized geometric sequence. So We should use (failedAttempts - 1) for fetching reconnect backoff and (failedAttemps) for fetching the connection timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r444945541



##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
     val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
     val DefaultRequestTimeoutMs = 5000
+    val DefaultSocketConnectionSetupMs = 10 * 1000
+    val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
       Looks like the constants haven't been removed.

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,14 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 1000L;
+
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DEFAULT = 127 * 1000L;

Review comment:
       As above, `DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS`

##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -103,6 +103,14 @@
         Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
+    public static final Long SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DEFAULT = 10 * 1000L;

Review comment:
       We should be consistent with the naming above for `DEFAULT_SECURITY_PROTOCOL` and name this `DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS`. We seem to use xxx_DEFAULT in some places, but common configs here and in SslConfigs and SaslConfigs use DEFAULT_xxx, so let's stick to that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r441915398



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -300,30 +323,48 @@ public AuthenticationException authenticationException(String id) {
      * Resets the failure count for a node and sets the reconnect backoff to the base
      * value configured via reconnect.backoff.ms
      *
-     * @param nodeState The node state object to update
+     * @param nodeState nodeState The node state object to update
      */
     private void resetReconnectBackoff(NodeConnectionState nodeState) {
         nodeState.failedAttempts = 0;
-        nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(0);
+    }
+
+    /**
+     * Resets the failure count for a node and sets the connection setup timeout to the base
+     * value configured via socket.connection.setup.timeout.ms
+     *
+     * @param nodeState nodeState The node state object to update
+     */
+    private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
+        nodeState.failedConnectAttempts = 0;
+        nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.term(0);
     }
 
     /**
-     * Update the node reconnect backoff exponentially.
+     * Increment the failure counter, update the node reconnect backoff exponentially,
+     * and record the current timestamp.
      * The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
      * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
      *
      * @param nodeState The node state object to update
      */
-    private void updateReconnectBackoff(NodeConnectionState nodeState) {
-        if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
-            nodeState.failedAttempts += 1;
-            double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
-            double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
-            long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
-            // Actual backoff is randomized to avoid connection storms.
-            double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
-            nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
-        }
+    private void incrementReconnectBackoff(NodeConnectionState nodeState, long now) {
+        nodeState.reconnectBackoffMs = reconnectBackoff.term(nodeState.failedAttempts);
+        nodeState.failedAttempts++;

Review comment:
       reconnectBackoff.term(0) will return the ${reconnect.backoff.ms} * 2 ^ 0 * jitter
   reconnectBackoff.term(1) will return the ${reconnect.backoff.ms} * 2 ^ 1 * jitter
   
   The difference btw reconnect backoff and connection timeout here is that, after the first failed attempts, the connection timeout will be the 1st term of the randomized geometric sequence but the reconnect backoff will be the 0st term of the randomized geometric sequence. So We should use (failedAttempts - 1) for fetching reconnect backoff and (failedAttemps) for fetching the connection timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-654671084


   Investigating this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443891445



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {

Review comment:
       Yes. I'll create a constant to make the test robust.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -786,6 +808,29 @@ private void handleAbortedSends(List<ClientResponse> responses) {
         abortedSends.clear();
     }
 
+    /**
+     * Handle socket channel connection timeout. The timeout will hit iff a connection
+     * stays at the ConnectionState.CONNECTING state longer than the timeout value,
+     * as indicated by ClusterConnectionStates.NodeConnectionState.
+     *
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleTimeoutConnections(List<ClientResponse> responses, long now) {
+        Set<String> connectingNodes = connectionStates.connectingNodes();
+        for (String nodeId: connectingNodes) {

Review comment:
       Refactored




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445054740



##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -205,6 +205,8 @@ object BrokerApiVersionsCommand {
   private object AdminClient {
     val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
     val DefaultRequestTimeoutMs = 5000
+    val DefaultSocketConnectionSetupMs = 10 * 1000
+    val DefaultSocketConnectionSetupMaxMs = 127 * 1000

Review comment:
       Oh, that's true. Removed them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-648866027


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632



##########
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##########
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
             } else if (connectionStates.isPreparingConnection(node.idString())) {
                 foundConnecting = node;
             } else if (canConnect(node, now)) {
-                foundCanConnect = node;
+                if (foundCanConnect == null ||
+                        this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+                                this.connectionStates.lastConnectAttemptMs(node.idString())) {
+                    foundCanConnect = node;
+                }

Review comment:
       Yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-651726976


   Streams test failures not related, merging to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cwildman commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
cwildman commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-674974228


   Noticed this didn't make the 2.6 release. Would it be possible to backport this to older 2.x versions? Or is 2.6.1 coming soon? Dealing with some environments with bad settings for tcp_syn. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #8683:
URL: https://github.com/apache/kafka/pull/8683


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r439646696



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +345,37 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    // Visible for testing
+    long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       I think we don't need to check if the node is in connecting state because the caller is only applying this test to all the nodes in the connecting state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
         return state;
     }
 
+    /**
+     * Get the id set of nodes which are in CONNECTING state
+     */
+    public Set<String> connectingNodes() {
+        return this.connectingNodes;
+    }
+
+    /**
+     * Get the timestamp of the latest connection attempt of a given node
+     * @param id the connection to fetch the state for
+     */
+    public long lastConnectAttemptMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+    }
+
+    public long connectionSetupTimeoutMs(String id) {
+        NodeConnectionState nodeState = this.nodeState.get(id);
+        return nodeState.connectionSetupTimeoutMs;
+    }
+
+    /**
+     * Test if the connection to the given node has reached its timeout
+     * @param id the connection to fetch the state for
+     * @param now the current time in ms
+     */
+    public boolean isConnectionSetupTimeout(String id, long now) {
+        return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
       I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`. (Line 145 & Line 157)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443769782



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
      */
     public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
-        nodeState.state = ConnectionState.DISCONNECTED;
         nodeState.lastConnectAttemptMs = now;
-        updateReconnectBackoff(nodeState);
+        incrementReconnectBackoff(nodeState);

Review comment:
       Make sense. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445911915



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for keeping the parameters and providing the value of exponential
+ * retry backoff, exponential reconnect backoff, exponential timeout, etc.
+ * The formula is

Review comment:
       : added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443891445



##########
File path: clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##########
@@ -321,4 +325,37 @@ public void testIsPreparingConnection() {
         connectionStates.disconnected(nodeId1, time.milliseconds());
         assertFalse(connectionStates.isPreparingConnection(nodeId1));
     }
+
+    @Test
+    public void testExponentialConnectionSetupTimeout() {
+        assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+
+        // Check the exponential timeout growth
+        for (int n = 0; n <= Math.log((double) connectionSetupTimeoutMaxMs / connectionSetupTimeoutMs) / Math.log(2); n++) {

Review comment:
       Yes. I added static class properties to make the test robust.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r445048690



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {

Review comment:
       Sure. Let's use this proposal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r443890115



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An utility class for exponential backoff, timeout, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class ExponentialBackoff {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public ExponentialBackoff(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?
+                Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0;
+    }
+
+    public long term(long n) {

Review comment:
       Thanks for the suggestion. Let's use the signature `backoff(long attempts)` for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-654585011


   @d8tltanc @rajinisivaram @dajac We observed some exception thrown from our soak with the following stack trace:
   
   ```
   java.util.ConcurrentModificationException
   	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
   	at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
   	at org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
   	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
   	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
   	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
   	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
   	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
   	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
   ```
   
   Seems related to this PR, could you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-650080012


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org