You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/26 07:40:26 UTC

[GitHub] [kafka] showuon opened a new pull request #11627: KAFKA-13565: add consumer exponential backoff

showuon opened a new pull request #11627:
URL: https://github.com/apache/kafka/pull/11627


   Co-Authored-By: Cheng Tan <31...@users.noreply.github.com>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777395274



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -102,11 +119,16 @@
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
     public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
-    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. " +
+        "If the connection is not built before the timeout elapses, clients will close the socket channel. " +
+        "This value is the initial backoff value and will increase exponentially for each consecutive connection failure, " +
+        "up to the <code>socket.connection.setup.timeout.max.ms</code> value.";
     public static final Long DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS = 10 * 1000L;
 
     public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
-    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.";
+    public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. " +
+        "The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, " +
+        "a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.";

Review comment:
       only make it into 3 lines, for better readability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777398361



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##########
@@ -32,7 +32,16 @@
     private final double expMax;
     private final long initialInterval;
     private final double jitter;
+    private long attemptedCount = 0;

Review comment:
       add the `attemptedCount` in `ExponentialBackoff` class. The caller can use the `attemptedCount` and doesn't need to maintain the attempted count in their side. It's good when the `ExponentialBackoff` only has single place to backoff, or the caller is inside lambda expression. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777395790



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -192,14 +214,39 @@
      * @return                          The new values which have been set as described in postProcessParsedConfig.
      */
     public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config,
-                                                    Map<String, Object> parsedValues) {
+                                                                         Map<String, Object> parsedValues) {
         HashMap<String, Object> rval = new HashMap<>();
         if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
                 config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-            log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.",
+            log.info("Disabling exponential reconnect backoff because {} is set, but {} is not.",

Review comment:
       Since we log `warn` when exponential `RETRY_BACKOFF_MS` and `SOCKET_CONNECTION_SETUP_TIMEOUT_MS` is disabled, I think we should at least log `info` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777394154



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -536,7 +535,12 @@ private void clearAddresses() {
         }
 
         public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
+            return "NodeState(" +
+                "state=" + state + ", " +
+                "lastConnectAttemptMs=" + lastConnectAttemptMs + ", " +
+                "failedAttempts=" + failedAttempts + ", " +
+                "failedConnectAttempts=" + failedConnectAttempts + ", " +
+                "throttleUntilTimeMs=" + throttleUntilTimeMs + ")";

Review comment:
       The original `toString` is unable to read because no leading variable name. Update it, and add `failedConnectAttempts`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777398999



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -191,6 +192,21 @@
 
     private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0));
 
+    private SubscriptionState subscription;
+    private Time time;
+
+    @BeforeEach
+    public void setup() {
+        this.time = new MockTime();
+        // default to reset to the earliest offset
+        this.subscription = createSubscriptionState(OffsetResetStrategy.EARLIEST);
+    }
+
+    private SubscriptionState createSubscriptionState(OffsetResetStrategy offsetResetStrategy) {
+        // use static backoff time for testing
+        return new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100);
+    }
+

Review comment:
       Refactor the tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
kirktrue commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1028296460


   This looks pretty straightforward to me, though I'm still learning the consumer client piece.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1019081971


   @dajac @kirktrue,  please help review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777393726



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -357,8 +357,7 @@ private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
     }
 
     /**
-     * Increment the failure counter, update the node reconnect backoff exponentially,
-     * and record the current timestamp.
+     * Increment the failure counter, update the node reconnect backoff exponentially.

Review comment:
       this method doesn't `record the current timestamp`. Updated it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1056898400


   @dajac , please take a look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1005389524


   Thanks @showuon , I've made a quick pass and it looks promising to me. I'm pinging @dajac and @kirktrue who would be the best reviewers for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777394827



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -70,17 +70,34 @@
     public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'";
 
     public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
-    public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.";
+    public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. " +
+        "This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. " +
+        "This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the <code>reconnect.backoff.max.ms</code> value.";

Review comment:
       Add the last sentence to mention this is the initial backoff value and will increase exponentially up to `reconnect.backoff.max.ms`.
   
   Same as below `retry.backoff.ms` and `socket.connection.setup.timeout.ms`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#discussion_r777393490



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##########
@@ -372,7 +371,7 @@ private void updateReconnectBackoff(NodeConnectionState nodeState) {
     /**
      * Increment the failure counter and update the node connection setup timeout exponentially.
      * The delay is socket.connection.setup.timeout.ms * 2**(failures) * (+/- 20% random jitter)
-     * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
+     * Up to a (pre-jitter) maximum of socket.connection.setup.timeout.max.ms

Review comment:
       side fix: the java doc is talking about `socket.connection.setup.timeout.ms`, not `reconnect.backoff.ms`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1004004071


   @skaundinya15 @abbccdda @guozhangwang , please help review when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org