You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/01 08:38:00 UTC

[GitHub] [kafka] skaundinya15 commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

skaundinya15 commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r448066943



##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -74,22 +76,28 @@
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    private final ExponentialBackoff refreshBackoff;
+    final static double RETRY_BACKOFF_JITTER = 0.2;

Review comment:
       Since `RETRY_BACKOFF_JITTER` and `RETRY_BACKOFF_EXP_BASE` will be the same for all the clients, should we move these two defaults in `CommonClientConfigs.java` instead? That way we can access this across the `Consumer`, `Producer`, and `KafkaAdminClient`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -721,6 +730,11 @@ protected Node curNode() {
             return curNode;
         }
 
+        final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
       Do we need to pass in the `failedCall` parameter here? I was thinking we could rewrite the function like so:
   ```java
   final void incrementRetryBackoff(long now) {
     this.nextAllowedTryMs = now + retryBackoff.backoff(tries);
     this.tries++;
   }
   ```
   Since we'd want to increment the retry backoff for a call, we should call the function on the `Call` object itself, rather than passing in the instance.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1215,6 +1220,8 @@ private void completeTransaction() {
     abstract class TxnRequestHandler implements RequestCompletionHandler {
         protected final TransactionalRequestResult result;
         private boolean isRetry = false;
+        private int attempts = 0;
+        private long retryBackoffMs = retryBackoff.backoff(0);

Review comment:
       I've noticed that we call `retryBackoff.backoff(0)` in a couple different places. Since this call is just getting the base backoff, it might be worth adding a method to the `ExponentialBackoff` class that just returns the base backoff with the random jitter applied.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1295,7 +1309,7 @@ private void processRequests() {
 
                 // Ensure that we use a small poll timeout if there are pending calls which need to be sent
                 if (!pendingCalls.isEmpty())
-                    pollTimeout = Math.min(pollTimeout, retryBackoffMs);
+                    pollTimeout = Math.min(pollTimeout, retryBackoff.backoff(0));

Review comment:
       What's the reasoning behind doing `retryBackoff.backoff(0)` here? From what I understand before we wanted to take the minimum of the `pollTimeout` and the configured `retryBackoffMs`. Since now the backoff for each call could be different in `pendingCalls`, we could take the minimum of all the backoffs in the `pendingCalls` list and compare that to the `pollTimeout`.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##########
@@ -70,7 +70,7 @@
     private boolean shouldRebalance;
 
     public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
-        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
+        this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100);

Review comment:
       Nit: Can we use the default variables defined in `CommonClientConfig.java`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3904,9 +3914,14 @@ void handleResponse(AbstractResponse abstractResponse) {
                     if (!retryTopicPartitionOffsets.isEmpty()) {
                         Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
                             TopicPartition::topic).collect(Collectors.toSet());
-                        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
+                        MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> operationContext =
                             new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
-                        rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures));
+                        rescheduleMetadataTask(operationContext, () -> {
+                            List<Call> listOffsetsCalls = getListOffsetsCalls(operationContext, retryTopicPartitionOffsets, futures);
+                            listOffsetsCalls.forEach(call ->
+                                call.incrementRetryBackoff(this, time.milliseconds()));

Review comment:
       As mentioned before, since we are constructing new calls here, it might make more sense to set the properties for each new `Call` created with `this.tries` + 1 and `this.nextAllowedTryMs`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -105,7 +105,7 @@ public WorkerGroupMember(DistributedConfig config,
 
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            this.metadata = new Metadata(retryBackoffMs, retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),

Review comment:
       I know the scope of the KIP was limited to the Java clients and not delving into anything with respect to Connect or the Streams framework, but in this case, if we set the `retry.backoff.ms` equal to `retry.backoff.max.ms` what is the behavior in the `ExponentialBackoff` class? Ultimately the `Metadata` portion of this will affect the Connect framework, so I'm trying to understand what will be the impact of setting both to be the same.

##########
File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
##########
@@ -281,8 +281,7 @@ object BrokerApiVersionsCommand {
       val logContext = new LogContext(s"[LegacyAdminClient clientId=$clientId] ")
       val time = Time.SYSTEM
       val metrics = new Metrics(time)
-      val metadata = new Metadata(100L, 60 * 60 * 1000L, logContext,
-        new ClusterResourceListeners)
+      val metadata = new Metadata(100L, 100L, 60 * 60 * 1000L, logContext, new ClusterResourceListeners)

Review comment:
       I think we should set the values for `Metadata` here with the default values we have in `CommonClientConfigs.java`, so that whenever this command is executed, the exponential backoff will be utilized.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -553,7 +558,11 @@ private KafkaAdminClient(AdminClientConfig config,
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
-        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.retryBackoff = new ExponentialBackoff(

Review comment:
       I think in the KIP we had stated that if `retry.backoff.ms` was set to be greater than `retry.backoff.max.ms`, we will log a warning and state that `retry.backoff.max.ms` will be used as the static backoff. It would be prudent to put this `WARN` level log in all the clients upon startup. Also, we need to account for this case in `ExponentialBackoff.java`, as I don't think it's accounted for in the current implementation.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1114,7 +1119,7 @@ private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler)
         return false;
     }
 
-    private void enqueueRequest(TxnRequestHandler requestHandler) {
+    void enqueueRequest(TxnRequestHandler requestHandler) {

Review comment:
       I think this was made non-private because it had to be visible for testing. Could we add a comment at the top indicating that?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -814,10 +821,10 @@ private void validateOffsetsAsync(Map<TopicPartition, FetchPosition> partitionsT
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
+                        subscriptions.requestFailed(offsetsResult.partitionsToRetry(), time.milliseconds());
                         metadata.requestUpdate();
                     }
-
+                    subscriptions.requestSucceeded(offsetsResult.endOffsets().keySet(), time.milliseconds());

Review comment:
       Does `offsetsResult.endOffsets().keySet()` exclude `offsetsResult.partitionsToRetry()`? If not we should remove the `partitionsToRetry` so we don't accidentally reset the attempts for those partitions.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1303,6 +1311,10 @@ boolean isRetry() {
             return isRetry;
         }
 
+        int Attempts() {

Review comment:
       nit: Change `Attempts()` to `attempts()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -749,8 +763,8 @@ final void fail(long now, Throwable throwable) {
                 runnable.enqueue(this, now);
                 return;
             }
-            tries++;
-            nextAllowedTryMs = now + retryBackoffMs;
+
+            incrementRetryBackoff(this, now);

Review comment:
       Similar comment as before - since this function is happening on the `Call` object, we could just call `incrementRetryBackoff(now);` and don't need to pass in the `this` instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org