You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/05 01:24:54 UTC

[kafka] branch 2.5 updated: KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (#8154)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 42b1d64  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (#8154)
42b1d64 is described below

commit 42b1d64b6228f7ff645063b8c0949752a864345d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Feb 23 10:16:32 2020 -0800

    KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (#8154)
    
    With a short timeout, a call in KafkaAdminClient may timeout and the client might disconnect. Currently this can be exposed to the user as either a TimeoutException or a DisconnectException. To be consistent, rather than exposing the underlying retriable error, we handle both cases with a TimeoutException.
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c2dd452..f16a71e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -708,11 +708,7 @@ public class KafkaAdminClient extends AdminClient {
                 // TimeoutException. In this case, we do not get any more retries - the call has
                 // failed. We increment tries anyway in order to display an accurate log message.
                 tries++;
-                if (log.isDebugEnabled()) {
-                    log.debug("{} aborted at {} after {} attempt(s)", this, now, tries,
-                        new Exception(prettyPrintException(throwable)));
-                }
-                handleFailure(new TimeoutException("Aborted due to timeout."));
+                failWithTimeout(now, throwable);
                 return;
             }
             // If this is an UnsupportedVersionException that we can retry, do so. Note that a
@@ -729,14 +725,10 @@ public class KafkaAdminClient extends AdminClient {
 
             // If the call has timed out, fail.
             if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
-                        new Exception(prettyPrintException(throwable)));
-                }
-                handleFailure(throwable);
+                failWithTimeout(now, throwable);
                 return;
             }
-            // If the exception is not retryable, fail.
+            // If the exception is not retriable, fail.
             if (!(throwable instanceof RetriableException)) {
                 if (log.isDebugEnabled()) {
                     log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
@@ -747,11 +739,7 @@ public class KafkaAdminClient extends AdminClient {
             }
             // If we are out of retries, fail.
             if (tries > maxRetries) {
-                if (log.isDebugEnabled()) {
-                    log.debug("{} failed after {} attempt(s)", this, tries,
-                        new Exception(prettyPrintException(throwable)));
-                }
-                handleFailure(throwable);
+                failWithTimeout(now, throwable);
                 return;
             }
             if (log.isDebugEnabled()) {
@@ -761,6 +749,15 @@ public class KafkaAdminClient extends AdminClient {
             runnable.enqueue(this, now);
         }
 
+        private void failWithTimeout(long now, Throwable cause) {
+            if (log.isDebugEnabled()) {
+                log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
+                    new Exception(prettyPrintException(cause)));
+            }
+            handleFailure(new TimeoutException(this + " timed out at " + now
+                + " after " + tries + " attempt(s)", cause));
+        }
+
         /**
          * Create an AbstractRequest.Builder for this Call.
          *
@@ -781,7 +778,7 @@ public class KafkaAdminClient extends AdminClient {
 
         /**
          * Handle a failure. This will only be called if the failure exception was not
-         * retryable, or if we hit a timeout.
+         * retriable, or if we hit a timeout.
          *
          * @param throwable     The exception.
          */
@@ -1113,7 +1110,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Handle the result of the call. This may involve retrying the call, if we got a
-                // retryible exception.
+                // retriable exception.
                 if (response.versionMismatch() != null) {
                     call.fail(now, response.versionMismatch());
                 } else if (response.wasDisconnected()) {