You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 03:47:58 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation

abbccdda commented on a change in pull request #8421:
URL: https://github.com/apache/kafka/pull/8421#discussion_r433603033



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1309,7 +1362,8 @@ private void processRequests() {
          * @param now       The current time in milliseconds.
          */
         void enqueue(Call call, long now) {
-            if (call.tries > maxRetries) {

Review comment:
       Since we already got the refactoring, the above comment seems unnecessary if we make `enqueue` private.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##########
@@ -67,9 +67,14 @@
      * <code>retry.backoff.ms</code>
      */
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
-                "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
-                "some failure scenarios.";
+    private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+
+    /**
+     * <code>retry.backoff.max.ms</code>
+     */
+    // TODO: Add validator rules and force backoff_max_ms > backoff_ms if possible (I guess it's impossible)

Review comment:
       Let's use JIRA ticket to track later works instead of comments.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -708,11 +755,67 @@ protected Node curNode() {
             return curNode;
         }
 
+        public CallRetryContext callRetryContext() {
+            return callRetryContext;
+        }
+
+        private boolean shouldRetry(Throwable t) {
+            return t instanceof RetriableException || t instanceof UnsupportedVersionException;
+        }
+
+        /**
+         * Depending on what the exception is, we may choose to fail the Call, or retry it.
+         *
+         * @param failedCall    The failed call.
+         * @param t             The failure exception.

Review comment:
       Let's be consistent to name it throwable

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1344,6 +1419,9 @@ void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
                 log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);

Review comment:
       We could mention the pending shutdown in error log instead.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1344,6 +1419,9 @@ void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
                 log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
                 call.fail(Long.MAX_VALUE, new TimeoutException("The AdminClient thread is not accepting new calls."));
+            } else if (call.callRetryContext().tries() > maxRetries) {
+                log.debug("Max retries {} for {} reached", maxRetries, call);
+                call.fail(time.milliseconds(), new TimeoutException());

Review comment:
       Should we have put the max retry timeout message inside the exception?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1135,7 +1215,7 @@ private void handleResponses(long now, List<ClientResponse> responses) {
                     if (authException != null) {
                         call.fail(now, authException);
                     } else {
-                        call.fail(now, new DisconnectException(String.format(
+                        call.retry(call, new DisconnectException(String.format(

Review comment:
       This seems not aligned with original logic.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1335,7 +1409,8 @@ void enqueue(Call call, long now) {
         /**
          * Initiate a new call.
          *
-         * This will fail if the AdminClient is scheduled to shut down.
+         * This will fail if the AdminClient is scheduled to shut down, or

Review comment:
       Overtime explanatory comments will get outdated, and considering the logic in this function is pretty concise, I would recommend removing the meta comment.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -683,21 +686,65 @@ public Node provide() {
         }
     }
 
+    /**
+     * Provides context which the retry may refer to
+     */
+    class CallRetryContext {
+
+        private int tries = 0;
+        private long nextAllowedTryMs = 0;
+        private final double JITTER_MIN = 0.8;
+        private final double JITTER_MAX = 1.2;

Review comment:
       Checkstyle failure:
   ```
   Name 'JITTER_MAX' must match pattern '^[a-z][a-zA-Z0-9]*$'. [MemberName]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org