You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/03/05 01:24:54 UTC
[kafka] branch 2.5 updated: KAFKA-9530;
Fix flaky test `testDescribeGroupWithShortInitializationTimeout`
(#8154)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 42b1d64 KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (#8154)
42b1d64 is described below
commit 42b1d64b6228f7ff645063b8c0949752a864345d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Feb 23 10:16:32 2020 -0800
KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (#8154)
With a short timeout, a call in KafkaAdminClient may timeout and the client might disconnect. Currently this can be exposed to the user as either a TimeoutException or a DisconnectException. To be consistent, rather than exposing the underlying retriable error, we handle both cases with a TimeoutException.
Reviewers: Boyang Chen <bo...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
.../kafka/clients/admin/KafkaAdminClient.java | 33 ++++++++++------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c2dd452..f16a71e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -708,11 +708,7 @@ public class KafkaAdminClient extends AdminClient {
// TimeoutException. In this case, we do not get any more retries - the call has
// failed. We increment tries anyway in order to display an accurate log message.
tries++;
- if (log.isDebugEnabled()) {
- log.debug("{} aborted at {} after {} attempt(s)", this, now, tries,
- new Exception(prettyPrintException(throwable)));
- }
- handleFailure(new TimeoutException("Aborted due to timeout."));
+ failWithTimeout(now, throwable);
return;
}
// If this is an UnsupportedVersionException that we can retry, do so. Note that a
@@ -729,14 +725,10 @@ public class KafkaAdminClient extends AdminClient {
// If the call has timed out, fail.
if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
- if (log.isDebugEnabled()) {
- log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
- new Exception(prettyPrintException(throwable)));
- }
- handleFailure(throwable);
+ failWithTimeout(now, throwable);
return;
}
- // If the exception is not retryable, fail.
+ // If the exception is not retriable, fail.
if (!(throwable instanceof RetriableException)) {
if (log.isDebugEnabled()) {
log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
@@ -747,11 +739,7 @@ public class KafkaAdminClient extends AdminClient {
}
// If we are out of retries, fail.
if (tries > maxRetries) {
- if (log.isDebugEnabled()) {
- log.debug("{} failed after {} attempt(s)", this, tries,
- new Exception(prettyPrintException(throwable)));
- }
- handleFailure(throwable);
+ failWithTimeout(now, throwable);
return;
}
if (log.isDebugEnabled()) {
@@ -761,6 +749,15 @@ public class KafkaAdminClient extends AdminClient {
runnable.enqueue(this, now);
}
+ private void failWithTimeout(long now, Throwable cause) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
+ new Exception(prettyPrintException(cause)));
+ }
+ handleFailure(new TimeoutException(this + " timed out at " + now
+ + " after " + tries + " attempt(s)", cause));
+ }
+
/**
* Create an AbstractRequest.Builder for this Call.
*
@@ -781,7 +778,7 @@ public class KafkaAdminClient extends AdminClient {
/**
* Handle a failure. This will only be called if the failure exception was not
- * retryable, or if we hit a timeout.
+ * retriable, or if we hit a timeout.
*
* @param throwable The exception.
*/
@@ -1113,7 +1110,7 @@ public class KafkaAdminClient extends AdminClient {
}
// Handle the result of the call. This may involve retrying the call, if we got a
- // retryible exception.
+ // retriable exception.
if (response.versionMismatch() != null) {
call.fail(now, response.versionMismatch());
} else if (response.wasDisconnected()) {