You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/10 20:05:33 UTC

[GitHub] [kafka] d8tltanc opened a new pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

d8tltanc opened a new pull request #8846:
URL: https://github.com/apache/kafka/pull/8846


   Please refer to Jira for more details:
   https://issues.apache.org/jira/browse/KAFKA-9800
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170


   Thanks @abbccdda for the suggestions on the utility class.
   
   @skaundinya15 @ijuma I've opened this new PR for KIP-580 exponential retry backoff implementation. I've finalized the patch and it's ready for reviews. Please take your time and feel free to leave comments. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r447942236



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -67,9 +67,14 @@
      * <code>retry.backoff.ms</code>
      */
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
-                "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
-                "some failure scenarios.";
+    private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+
+    /**
+     * <code>retry.backoff.max.ms</code>
+     */
+    // TODO: Add validator rules and force backoff_max_ms > backoff_ms if possible (I guess it's impossible)

Review comment:
       Since now the utility class will provide a static backoff if `backoff_max_ms` > `backoff_ms`, we don't need the checker anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-652013968


   @skaundinya15 @abbccdda @ijuma 
   The utility class GeometricProgression is renamed in #8683 and merged into trunk. Now, this patch is rebased and ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468742346



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2739,8 +2753,8 @@ private void rescheduleFindCoordinatorTask(ConsumerGroupOperationContext<?, ?> c
         context.setNode(null);
 
         Call call = nextCall.get();
-        call.tries = failedCall.tries + 1;
-        call.nextAllowedTryMs = calculateNextAllowedRetryMs();
+
+        call.incrementRetryBackoff(failedCall, time.milliseconds());

Review comment:
       What about renaming `incrementRetryBackoff` to `updateRetryBackoff`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468830657



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -105,7 +105,7 @@ public WorkerGroupMember(DistributedConfig config,
 
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            this.metadata = new Metadata(retryBackoffMs, retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),

Review comment:
       Yes. If we set `retryBackoffMs` and `retryBackoffMaxMs` to the same value, `Metadata` will have a static backoff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468738156



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1295,7 +1309,7 @@ private void processRequests() {
 
                 // Ensure that we use a small poll timeout if there are pending calls which need to be sent
                 if (!pendingCalls.isEmpty())
-                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
+                    pollTimeout = Math.min(pollTimeout, retryBackoff.backoff(0));

Review comment:
       Yes, `retryBackoff.backoff(0)` equals  to `retryBackoffMs`
   ```
       public long backoff(long attempts) {
           if (expMax == 0) {
               return initialInterval;
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-984230083


   @ctan888, Sorry again, but I'd really like to have this KIP to get implemented in Kafka. I've been waiting for your reply for more than a month (from github and direct email). Just want to let you know that I'll start to help complete this PR for KIP-580 and co-author with you next week (12/6). If you have any comments/thoughts, please let me know. Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r444631685



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -84,6 +84,9 @@
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
 
+    public static final String RETRY_BACKOFF_MAX_MS_CONFIG = "retry.backoff.max.ms";
+    public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, a randomization factor of 0.2 will be applied to the backoff, resulting in a random range between 20% below and 20% above the computed value. If retry.backoff.ms is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant backoff from the beginning without any exponential increase";

Review comment:
       Nit: Can we change the wording to the following?
   ```suggestion
       public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, a randomized jitter with a factor of 0.2 will be applied to the backoff, resulting in the backoff falling within a range between 20% below and 20% above the computed value. If retry.backoff.ms is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant backoff from the beginning without any exponential increase";
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       For DescribeConsumerGroup and ListOffsets, AdminClient will construct a chain of requests. For example, DescribeConsumerGroup will send a FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and D). 
   
   Let's consider this case:
   F (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, tries = 2) -> D (OK, tries = 2)
   
   Either F and D need to be aware of the number of attempts of the previously failed request to set their own attempts properly. Since F and D are separate objects and does not share any memory region, we can probably only to pass the failed call instance for constructing their status properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: [WIP] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r439565657



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;

Review comment:
       I agree with the potential renaming. But it would be a little bit hard since exponential timeout also depends on this util class. Maybe we can come out with another variable name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-675675970


   Some compilation errors:
   ```
   11:41:23 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:474: error: cannot assign a value to final variable refreshBackoffMs
   11:41:23         this.refreshBackoffMs = this.refreshBackoff.backoff(this.attempts);
   11:41:23             ^
   11:41:23 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:481: error: cannot assign a value to final variable refreshBackoffMs
   11:41:23         this.refreshBackoffMs = this.refreshBackoff.baseBackoff();
   11:41:23       
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r448066943



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -74,22 +76,28 @@
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    private final ExponentialBackoff refreshBackoff;
+    final static double RETRY_BACKOFF_JITTER = 0.2;

Review comment:
       Since `RETRY_BACKOFF_JITTER` and `RETRY_BACKOFF_EXP_BASE` will be the same for all the clients, should we move these two defaults in `CommonClientConfigs.java` instead? That way we can access this across the `Consumer`, `Producer`, and `KafkaAdminClient`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       Do we need to pass in the `failedCall` parameter here? I was thinking we could rewrite the function like so:
   ```java
   final void incrementRetryBackoff(long now) {
     this.nextAllowedTryMs = now + retryBackoff.backoff(tries);
     this.tries++;
   }
   ```
   Since we'd want to increment the retry backoff for a call, we should call the function on the `Call` object itself, rather than passing in the instance.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1215,6 +1220,8 @@ private void completeTransaction() {
     abstract class TxnRequestHandler implements RequestCompletionHandler {
         protected final TransactionalRequestResult result;
         private boolean isRetry = false;
+        private int attempts = 0;
+        private long retryBackoffMs = retryBackoff.backoff(0);

Review comment:
       I've noticed that we call `retryBackoff.backoff(0)` in a couple different places. Since this call is just getting the base backoff, it might be worth adding a method to the `ExponentialBackoff` class that just returns the base backoff with the random jitter applied.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1295,7 +1309,7 @@ private void processRequests() {
 
                 // Ensure that we use a small poll timeout if there are pending calls which need to be sent
                 if (!pendingCalls.isEmpty())
-                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
+                    pollTimeout = Math.min(pollTimeout, retryBackoff.backoff(0));

Review comment:
       What's the reasoning behind doing `retryBackoff.backoff(0)` here? From what I understand before we wanted to take the minimum of the `pollTimeout` and the configured `retryBackoffMs`. Since now the backoff for each call could be different in `pendingCalls`, we could take the minimum of all the backoffs in the `pendingCalls` list and compare that to the `pollTimeout`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -70,7 +70,7 @@
     private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100);

Review comment:
       Nit: Can we use the default variables defined in `CommonClientConfig.java`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3904,9 +3914,14 @@ void handleResponse(AbstractResponse abstractResponse) {
                     if (!retryTopicPartitionOffsets.isEmpty()) {
                         Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
                             TopicPartition::topic).collect(Collectors.toSet());
-                        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
+                        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> operationContext =
                             new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
-                        rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures));
+                        rescheduleMetadataTask(operationContext, () -> {
+                            List<Call> listOffsetsCalls = getListOffsetsCalls(operationContext, retryTopicPartitionOffsets, futures);
+                            listOffsetsCalls.forEach(call ->
+                                call.incrementRetryBackoff(this, time.milliseconds()));

Review comment:
       As mentioned before, since we are constructing new calls here, it might make more sense to set the properties for each new `Call` created with `this.tries` + 1 and `this.nextAllowedTryMs`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -105,7 +105,7 @@ public WorkerGroupMember(DistributedConfig config,
 
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            this.metadata = new Metadata(retryBackoffMs, retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),

Review comment:
       I know the scope of the KIP was limited to the Java clients and not delving into anything with respect to Connect or the Streams framework, but in this case, if we set the `retry.backoff.ms` equal to `retry.backoff.max.ms` what is the behavior in the `ExponentialBackoff` class? Ultimately the `Metadata` portion of this will affect the Connect framework, so I'm trying to understand what will be the impact of setting both to be the same.

##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -281,8 +281,7 @@ object BrokerApiVersionsCommand {
       val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
       val time = Time.SYSTEM
       val metrics = new Metrics(time)
-      val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
-        new ClusterResourceListeners)
+      val metadata = new Metadata(100L, 100L, 60 * 60 * 1000L, logContext, new ClusterResourceListeners)

Review comment:
       I think we should set the values for `Metadata` here with the default values we have in `CommonClientConfigs.java`, so that whenever this command is executed, the exponential backoff will be utilized.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -553,7 +558,11 @@ private KafkaAdminClient(AdminClientConfig config,
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
-        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoff = new ExponentialBackoff(

Review comment:
       I think in the KIP we had stated that if `retry.backoff.ms` was set to be greater than `retry.backoff.max.ms`, we will log a warning and state that `retry.backoff.max.ms` will be used as the static backoff. It would be prudent to put this `WARN` level log in all the clients upon startup. Also, we need to account for this case in `ExponentialBackoff.java`, as I don't think it's accounted for in the current implementation.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1114,7 +1119,7 @@ private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler)
         return false;
     }
 
-    private void enqueueRequest(TxnRequestHandler requestHandler) {
+    void enqueueRequest(TxnRequestHandler requestHandler) {

Review comment:
       I think this was made non-private because it had to be visible for testing. Could we add a comment at the top indicating that?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
+                        subscriptions.requestFailed(offsetsResult.partitionsToRetry(), time.milliseconds());
                         metadata.requestUpdate();
                     }
-
+                    subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), time.milliseconds());

Review comment:
       Does `offsetsResult.endOffsets().keySet()` exclude `offsetsResult.partitionsToRetry()`? If not we should remove the `partitionsToRetry` so we don't accidentally reset the attempts for those partitions.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1303,6 +1311,10 @@ boolean isRetry() {
             return isRetry;
         }
 
+        int Attempts() {

Review comment:
       nit: Change `Attempts()` to `attempts()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -749,8 +763,8 @@ final void fail(long now, Throwable throwable) {
                 runnable.enqueue(this, now);
                 return;
             }
-            tries++;
-            nextAllowedTryMs = now + retryBackoffMs;
+
+            incrementRetryBackoff(this, now);

Review comment:
       Similar comment as before - since this function is happening on the `Call` object, we could just call `incrementRetryBackoff(now);` and don't need to pass in the `this` instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       For DescribeConsumerGroup and ListOffsets, AdminClient will construct a chain of requests. For example, DescribeConsumerGroup will send a FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and D). 
   
   Let's consider this case:
   F (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, tries = 2) -> D (OK, tries = 2)
   
   F may come after either a failed F or failed D. It should increase the attempts based on either the attempts of F or D.
   D will come after a succeeded F. It should increase the attempts based on the attempts of F.
   
   Either F and D need to be aware of the number of attempts of the previously failed request to set their own attempts properly. Since F and D are separate objects and does not share any memory region, we can probably only to pass the failed call instance for constructing their status properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-675647225


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468757303



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1303,6 +1311,10 @@ boolean isRetry() {
             return isRetry;
         }
 
+        int Attempts() {

Review comment:
       Good catch. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468756144



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
+                        subscriptions.requestFailed(offsetsResult.partitionsToRetry(), time.milliseconds());
                         metadata.requestUpdate();
                     }
-
+                    subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), time.milliseconds());

Review comment:
       Yes. It excludes partitions to retry.
   ```
   switch (error) {
                   case UNKNOWN_LEADER_EPOCH:
                       logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
                               topicPartition, error);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case UNKNOWN_TOPIC_OR_PARTITION:
                       logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}",
                               topicPartition);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case TOPIC_AUTHORIZATION_FAILED:
                       unauthorizedTopics.add(topicPartition.topic());
                       break;
                   default:
                       logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.",
                               topicPartition, error.message());
                       partitionsToRetry.add(topicPartition);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468832838



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -553,7 +558,11 @@ private KafkaAdminClient(AdminClientConfig config,
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
-        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoff = new ExponentialBackoff(

Review comment:
       Yes. We can do something similar to `postProcessReconnectBackoffConfigs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469473196



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -74,22 +76,28 @@
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    private final ExponentialBackoff refreshBackoff;
+    final static double RETRY_BACKOFF_JITTER = 0.2;

Review comment:
       Make sense. Moved them into `CommonClientConfigs` for reuse.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468756144



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
+                        subscriptions.requestFailed(offsetsResult.partitionsToRetry(), time.milliseconds());
                         metadata.requestUpdate();
                     }
-
+                    subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), time.milliseconds());

Review comment:
       Yes. It excludes partitions to retry.
   ```
   switch (error) {
                   case NONE:
                       logger().debug("Handling OffsetsForLeaderEpoch response for {}. Got offset {} for epoch {}",
                               topicPartition, epochEndOffset.endOffset(), epochEndOffset.leaderEpoch());
                       endOffsets.put(topicPartition, epochEndOffset);
                       break;
                   case UNKNOWN_LEADER_EPOCH:
                       logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
                               topicPartition, error);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case UNKNOWN_TOPIC_OR_PARTITION:
                       logger().warn("Received unknown topic or partition error in OffsetsForLeaderEpoch request for partition {}",
                               topicPartition);
                       partitionsToRetry.add(topicPartition);
                       break;
                   case TOPIC_AUTHORIZATION_FAILED:
                       unauthorizedTopics.add(topicPartition.topic());
                       break;
                   default:
                       logger().warn("Attempt to fetch offsets for partition {} failed due to: {}, retrying.",
                               topicPartition, error.message());
                       partitionsToRetry.add(topicPartition);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-676628747


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-949368638


   @d8tltanc , this is a good KIP, but this PR has been pending for a long time. Do you need help? I can co-author with you and help complete it, if you want. Just let me know. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468829772



##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -281,8 +281,7 @@ object BrokerApiVersionsCommand {
       val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
       val time = Time.SYSTEM
       val metrics = new Metrics(time)
-      val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
-        new ClusterResourceListeners)
+      val metadata = new Metadata(100L, 100L, 60 * 60 * 1000L, logContext, new ClusterResourceListeners)

Review comment:
       Yes. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r474843042



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -84,6 +84,14 @@
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";

Review comment:
       Should we update the comment to indicate this is the initial backoff time now?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -74,22 +76,28 @@
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    private final ExponentialBackoff refreshBackoff;
+    final static double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER;

Review comment:
       Are these two necessary? Couldn't we just directly use the config value instead?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -457,6 +467,18 @@ private void clearRecoverableErrors() {
      * to avoid retrying immediately.
      */
     public synchronized void failedUpdate(long now) {
+        this.incrementRefreshBackoff(now);
+    }
+
+    private void incrementRefreshBackoff(long now) {
+        this.refreshBackoffMs = this.refreshBackoff.backoff(this.attempts);

Review comment:
       For the general purpose ExponentialBackoff class, could we let the struct itself track the number of attempts made over time?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2277,26 +2295,36 @@ public void testDescribeConsumerGroupNumRetries() throws Exception {
         final Time time = new MockTime();
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
-            AdminClientConfig.RETRIES_CONFIG, "0")) {
+                AdminClientConfig.RETRIES_CONFIG, "1",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-
-            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+            // Case 1

Review comment:
       What does the 4 cases imply?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -350,7 +351,11 @@
 
     private final int maxRetries;
 
-    private final long retryBackoffMs;
+    private ExponentialBackoff retryBackoff;
+
+    final static double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER;

Review comment:
       Similar question, I don't see any obvious benefit for having 2 separate static variables.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -128,14 +136,17 @@ public synchronized String prettyString() {
         }
     }
 
-    public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) {
+    public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy,

Review comment:
       Could we create a constructor with default retry parameters? We could reduce code changes and make the default behavior consistent across different tests.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2235,39 +2227,65 @@ public void testOffsetCommitRetryBackoff() throws Exception {
 
             mockClient.setNodeApiVersions(NodeApiVersions.create());
 
-            AtomicLong firstAttemptTime = new AtomicLong(0);
-            AtomicLong secondAttemptTime = new AtomicLong(0);
-
             final String groupId = "group-0";
             final TopicPartition tp1 = new TopicPartition("foo", 0);
 
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-            mockClient.prepareResponse(body -> {
-                firstAttemptTime.set(time.milliseconds());
-                return true;
-            }, prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
-
+            mockClient.prepareResponse(prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
 
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-            mockClient.prepareResponse(body -> {
-                secondAttemptTime.set(time.milliseconds());
-                return true;
-            }, prepareOffsetCommitResponse(tp1, Errors.NONE));
-
+            mockClient.prepareResponse(prepareOffsetCommitResponse(tp1, Errors.NONE));
 
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
             offsets.put(tp1, new OffsetAndMetadata(123L));
             final KafkaFuture<Void> future = env.adminClient().alterConsumerGroupOffsets(groupId, offsets).all();
 
-            TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, "Failed awaiting CommitOffsets first request failure");
+            TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 2, "Failed awaiting CommitOffsets first request failure");
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, "Failed to add retry CommitOffsets call on first failure");
-            time.sleep(retryBackoff);
 
+            long sleepTime1 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (1 - retryBackoffJitter)) - 1;
+            long sleepTime2 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (retryBackoffJitter * 2)) + 2;
+
+            time.sleep(sleepTime1);
+            assertEquals(1, ((KafkaAdminClient) env.adminClient()).numPendingCalls());
+
+            time.sleep(sleepTime2);
             future.get();
+        }
+    }
 
-            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-            assertEquals("CommitOffsets retry did not await expected backoff!", retryBackoff, actualRetryBackoff);
+    private void prepareFindCoordinatorResponse(AdminClientUnitTestEnv env, boolean successful) {
+        if (successful) {
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+        } else {
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, env.cluster().controller()));
+        }
+    }
+
+    private void prepareDescribeGroupResponse(AdminClientUnitTestEnv env, boolean successful) {
+        DescribeGroupsResponseData data;

Review comment:
       nit: I think we could just use different error code here, instead of building the entire data in branches. Also `data` should be final.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4068,6 +4046,295 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
         }
     }
 
+    private void prepareMetadataResponse(AdminClientUnitTestEnv env, boolean successful) {
+        if (successful) {
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+        } else {
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.LEADER_NOT_AVAILABLE));
+        }
+    }
+
+    private void prepareListoffsetsResponse(List<TopicPartition> tps, AdminClientUnitTestEnv env, boolean successful) {
+        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+        if (successful) {
+            tps.forEach(tp -> {
+                responseData.put(tp, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543)));
+            });
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+        } else {
+            tps.forEach(tp -> {
+                responseData.put(tp, new PartitionData(Errors.LEADER_NOT_AVAILABLE, 123456789L, 345L, Optional.of(543)));
+            });
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+        }
+    }
+
+    private void verifyListOffsetsResult(Map<TopicPartition, OffsetSpec> partitionOffsets, AdminClientUnitTestEnv env,
+                                         boolean successful) throws ExecutionException, InterruptedException {
+        ListOffsetsResult result = env.adminClient().listOffsets(partitionOffsets);
+        if (successful) {
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+            assertFalse(offsets.isEmpty());
+            partitionOffsets.keySet().forEach(tp -> {
+                assertEquals(345L, offsets.get(tp).offset());
+                assertEquals(543, offsets.get(tp).leaderEpoch().get().intValue());
+                assertEquals(123456789L, offsets.get(tp).timestamp());
+            });
+        } else {
+            TestUtils.assertFutureError(result.all(), TimeoutException.class);
+        }
+    }
+
+    @Deprecated
+    @Test
+    public void testListOffsetsMaxAllowedNumRetries() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0}));
+
+        MockTime time = new MockTime();
+        String maxAllowedNumTries = "2";
+        String retryBackoff = "0";
+
+        final Cluster cluster =
+                new Cluster(
+                        "mockClusterId",
+                        Arrays.asList(node0),
+                        pInfos,
+                        Collections.<String>emptySet(),
+                        Collections.<String>emptySet(),
+                        node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final TopicPartition tp2 = new TopicPartition("bar", 0);
+        final TopicPartition tp3 = new TopicPartition("baz", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+                AdminClientConfig.RETRIES_CONFIG, maxAllowedNumTries,
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            Map<TopicPartition, OffsetSpec> partitionOffsets = new HashMap<>();
+            partitionOffsets.put(tp1, OffsetSpec.latest());
+            partitionOffsets.put(tp2, OffsetSpec.earliest());
+            partitionOffsets.put(tp3, OffsetSpec.forTimestamp(time.milliseconds()));
+            List<TopicPartition> tps = new ArrayList<>(partitionOffsets.keySet());
+
+            // Case 1

Review comment:
       Would be better to describe what `cases` are referring to.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -341,10 +342,11 @@ int attempts() {
         return attempts.get();
     }
 
-    void reenqueued(long now) {
+    void reenqueued(long newRetryBackoffMs, long now) {
         attempts.getAndIncrement();
         lastAttemptMs = Math.max(lastAppendTime, now);
         lastAppendTime = Math.max(lastAppendTime, now);
+        retryBackoffMs = newRetryBackoffMs;

Review comment:
       nit: just pass `retryBackoffMs` and `this. retryBackoffMs = retryBackoffMs;`

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -620,13 +611,14 @@ public void testCreateTopicsRetryBackoff() throws Exception {
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1,
                 "Failed to add retry CreateTopics call");
 
-            time.sleep(retryBackoff);
+            long sleepTime1 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (1 - retryBackoffJitter)) - 1;
+            long sleepTime2 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (retryBackoffJitter * 2)) + 2;
 
-            future.get();
+            time.sleep(sleepTime1);
+            assertEquals(1, ((KafkaAdminClient) env.adminClient()).numPendingCalls());
 
-            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-            assertEquals("CreateTopics retry did not await expected backoff",
-                    retryBackoff, actualRetryBackoff);
+            time.sleep(sleepTime2);

Review comment:
       Do we lose precision of the backoff time here? Could we do (sleepTime - 1) and verify it is not done yet?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -708,16 +719,35 @@ public synchronized void resume(TopicPartition tp) {
         assignedState(tp).resume();
     }
 
-    synchronized void requestFailed(Set<TopicPartition> partitions, long nextRetryTimeMs) {
+    synchronized void requestFailed(Set<TopicPartition> partitions, long now) {

Review comment:
       I think we could make requestFailed/requestSucceeded as templates, and make the `incrementRetryBackoff`/`resetRetryBackoff` as function parameters to be passed in.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
##########
@@ -38,6 +38,8 @@
     private final CopyOnWriteArrayList<MockTimeListener> listeners = new CopyOnWriteArrayList<>();
 
     private final long autoTickMs;
+    // Record the sum of auto ticked milliseconds
+    private AtomicLong autoTickedMs;

Review comment:
       should be final

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -105,7 +105,7 @@ public WorkerGroupMember(DistributedConfig config,
 
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            this.metadata = new Metadata(retryBackoffMs, retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),

Review comment:
       Similar for `Metadata`, we should have a separate constructor without `refreshBackoffMaxMs` to set it to the same as `retryBackoffMs` by default to avoid mis-configuration.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
##########
@@ -75,8 +78,18 @@ public long nanoseconds() {
     }
 
     private void maybeSleep(long ms) {
-        if (ms != 0)
+        if (ms != 0) {
             sleep(ms);
+            autoTickedMs.getAndAdd(ms);
+        }
+    }
+
+    public long autoTickedMs() {
+        return autoTickedMs.get();
+    }
+
+    public void resetAutoTickedCounter() {

Review comment:
       nit: I think autoTickedMs is a recorder instead of a counter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       For DescribeConsumerGroup and ListOffsets, AdminClient will construct a chain of requests. For example, DescribeConsumerGroup will send a FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and D). Let's consider this case:
   F (OK, tries = 0) -> D (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, tries = 2) -> D (OK, tries = 2)
   Either F and D need to be aware of the number of attempts of the previously failed request to set their own attempts properly. Since F and D are separate objects and does not share any memory region, we can probably only to pass the failed call instance for constructing their status properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-676632413


   retest this please
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-673114249


   @abbccdda Thanks for the review offers! I'll rebase this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r444631685



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -84,6 +84,9 @@
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
 
+    public static final String RETRY_BACKOFF_MAX_MS_CONFIG = "retry.backoff.max.ms";
+    public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, a randomization factor of 0.2 will be applied to the backoff, resulting in a random range between 20% below and 20% above the computed value. If retry.backoff.ms is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant backoff from the beginning without any exponential increase";

Review comment:
       Nit: Can we change the wording to the following?
   ```suggestion
       public static final String RETRY_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when retrying a request to the broker that has repeatedly failed. If provided, the backoff per client will increase exponentially for each failed request, up to this maximum. To prevent all clients from being synchronized upon retry, a randomization jitter with a factor of 0.2 will be applied to the backoff, resulting in the backoff falling within a range between 20% below and 20% above the computed value. If retry.backoff.ms is set to be higher than retry.backoff.max.ms, then retry.backoff.max.ms will be used as a constant backoff from the beginning without any exponential increase";
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;

Review comment:
       It seems like that refactoring and renaming to `ExponentialBackoff` has happened in the patch #8683. Could you refactor this patch on top of that so it's more inline with what will be going into `trunk`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-673088220


   @d8tltanc Thanks for the great work! I could resume reviewing this PR sometime next week, do you mind rebasing it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: [WIP] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r441799632



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;

Review comment:
       @dajac suggests a naming ExponentialBackoff even if we use it for computing an exponential timeout as well. He thinks that people will understand this. I'd go for this suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468824804



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1215,6 +1220,8 @@ private void completeTransaction() {
     abstract class TxnRequestHandler implements RequestCompletionHandler {
         protected final TransactionalRequestResult result;
         private boolean isRetry = false;
+        private int attempts = 0;
+        private long retryBackoffMs = retryBackoff.backoff(0);

Review comment:
       Make sense. Refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468758893



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1114,7 +1119,7 @@ private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler)
         return false;
     }
 
-    private void enqueueRequest(TxnRequestHandler requestHandler) {
+    void enqueueRequest(TxnRequestHandler requestHandler) {

Review comment:
       Yes. Comment added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469471723



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;

Review comment:
       Resolved as this class has been merged in another PR.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided

Review comment:
       Resolved as this class has been merged in another PR.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...

Review comment:
       Resolved as this class has been merged in another PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468832838



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -553,7 +558,11 @@ private KafkaAdminClient(AdminClientConfig config,
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
-        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoff = new ExponentialBackoff(

Review comment:
       Yes. We can do something similar to `postProcessReconnectBackoffConfigs` and `warnIfDeprecatedDnsLookupValue`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468746415



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -70,7 +70,7 @@
     private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100);

Review comment:
       If we pass the same value for `retryBackoffMs` and `retryBackoffMaxMs`, a static backoff of 100 ms will be applied. I was thinking that we'd better make the MockConsumer use the static backoff since some of the written tests are assuming that.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -70,7 +70,7 @@
     private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100);

Review comment:
       If we pass the same value for `retryBackoffMs` and `retryBackoffMaxMs`, a static backoff of 100 ms will be applied. I was thinking that we'd better make the MockConsumer use the static backoff since some of the written tests are assuming that. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       For DescribeConsumerGroup and ListOffsets, AdminClient will construct a chain of requests. For example, DescribeConsumerGroup will send a FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and D). 
   
   Let's consider this case:
   F (OK, tries = 0) -> D (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, tries = 2) -> D (OK, tries = 2)
   
   Either F and D need to be aware of the number of attempts of the previously failed request to set their own attempts properly. Since F and D are separate objects and does not share any memory region, we can probably only to pass the failed call instance for constructing their status properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469472239



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;

Review comment:
       Resolved as this class has been merged.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?

Review comment:
       Resolved as this class has been merged in another PR.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       Resolved as this class has been merged in another PR.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided

Review comment:
       Resolved as this class has been merged in another PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8846: [WIP] KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r439549625



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;

Review comment:
       nit: align the order of initialization with parameter list.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -67,9 +67,14 @@
      * <code>retry.backoff.ms</code>
      */
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
-                "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
-                "some failure scenarios.";
+    private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+
+    /**
+     * <code>retry.backoff.max.ms</code>
+     */
+    // TODO: Add validator rules and force backoff_max_ms > backoff_ms if possible (I guess it's impossible)

Review comment:
       Could we create JIRA instead of todo?

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;
+    private final double jitter;
+
+    public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) {
+        this.scaleFactor = scaleFactor;
+        this.ratio = ratio;
+        this.jitter = jitter;
+        this.expMax = termMax > scaleFactor ?

Review comment:
       Since all the users of this class would apply exponential back-off, I don't think maintaining the capability for state back-off is necessary to overload the class.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...

Review comment:
       nit: An util class for... -> Utility class

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided

Review comment:
       period

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided

Review comment:
       equal to

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
       The classname and fields are too general, let's name them something directly relating to retry back-off IMHO.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+    private final int ratio;
+    private final double expMax;
+    private final long scaleFactor;

Review comment:
       This is a little bit hard to understand, should we name it `baseBackOffMs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] d8tltanc edited a comment on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

Posted by GitBox <gi...@apache.org>.
d8tltanc edited a comment on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org