You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/09/29 14:18:46 UTC

[kafka] branch trunk updated: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599) (#9344)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0fec75  MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599) (#9344)
a0fec75 is described below

commit a0fec75d3cee3d23bd517fe0acc65270a6cb0f88
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Sep 29 16:17:37 2020 +0200

    MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599) (#9344)
    
    This PR adds the logic to preserve the ThrottlingQuotaExceededException when topics are retried. The throttleTimeMs is also adjusted accordingly as the request could remain pending or in-flight for quite a long time.
    
    Have run various tests on clusters with enabled quotas and I, indeed, find it better to preserve the exception. Otherwise, the caller does not really understand what is going on. This allows the caller to take the appropriate measure and also to take the throttleTimeMs into consideration.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 95 +++++++++++++++++-----
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 ++-
 2 files changed, 85 insertions(+), 22 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 9acd341..3f39bbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1445,6 +1445,24 @@ public class KafkaAdminClient extends AdminClient {
                 entry.getValue().completeExceptionally(new ApiException(messageFormatter.apply(entry.getKey()))));
     }
 
+    /**
+     * Fail futures in the given Map which were retried due to exceeding quota. We propagate
+     * the initial error back to the caller if the request timed out.
+     */
+    private static <K, V> void maybeCompleteQuotaExceededException(
+            boolean shouldRetryOnQuotaViolation,
+            Throwable throwable,
+            Map<K, KafkaFutureImpl<V>> futures,
+            Map<K, ThrottlingQuotaExceededException> quotaExceededExceptions,
+            int throttleTimeDelta) {
+        if (shouldRetryOnQuotaViolation && throwable instanceof TimeoutException) {
+            quotaExceededExceptions.forEach((key, value) -> futures.get(key).completeExceptionally(
+                new ThrottlingQuotaExceededException(
+                    Math.max(0, value.throttleTimeMs() - throttleTimeDelta),
+                    value.getMessage())));
+        }
+    }
+
     @Override
     public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
@@ -1464,7 +1482,8 @@ public class KafkaAdminClient extends AdminClient {
         if (!topics.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getCreateTopicsCall(options, topicFutures, topics, deadline);
+            final Call call = getCreateTopicsCall(options, topicFutures, topics,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new CreateTopicsResult(new HashMap<>(topicFutures));
@@ -1473,6 +1492,8 @@ public class KafkaAdminClient extends AdminClient {
     private Call getCreateTopicsCall(final CreateTopicsOptions options,
                                      final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures,
                                      final CreatableTopicCollection topics,
+                                     final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                     final long now,
                                      final long deadline) {
         return new Call("createTopics", deadline, new ControllerNodeProvider()) {
             @Override
@@ -1491,6 +1512,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
                 final CreatableTopicCollection retryTopics = new CreatableTopicCollection();
+                final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (CreatableTopicResult result : response.data().topics()) {
                     KafkaFutureImpl<TopicMetadataAndConfig> future = futures.get(result.name());
                     if (future == null) {
@@ -1499,11 +1521,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     retryTopics.add(topics.find(result.name()).duplicate());
+                                    retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), error.messageWithFallback()));
+                                    future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 future.completeExceptionally(error.exception());
@@ -1535,8 +1559,10 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a result for topic " + topic);
                 } else {
-                    final Call call = getCreateTopicsCall(options, futures, retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getCreateTopicsCall(options, futures, retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
@@ -1554,6 +1580,11 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded exception, we propagate
+                // the initial error back to the caller if the request timed out.
+                maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
@@ -1578,7 +1609,8 @@ public class KafkaAdminClient extends AdminClient {
         if (!validTopicNames.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames, deadline);
+            final Call call = getDeleteTopicsCall(options, topicFutures, validTopicNames,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new DeleteTopicsResult(new HashMap<>(topicFutures));
@@ -1587,6 +1619,8 @@ public class KafkaAdminClient extends AdminClient {
     private Call getDeleteTopicsCall(final DeleteTopicsOptions options,
                                      final Map<String, KafkaFutureImpl<Void>> futures,
                                      final List<String> topics,
+                                     final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                     final long now,
                                      final long deadline) {
         return new Call("deleteTopics", deadline, new ControllerNodeProvider()) {
             @Override
@@ -1604,6 +1638,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
                 final List<String> retryTopics = new ArrayList<>();
+                final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (DeletableTopicResult result : response.data().responses()) {
                     KafkaFutureImpl<Void> future = futures.get(result.name());
                     if (future == null) {
@@ -1612,11 +1647,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     retryTopics.add(result.name());
+                                    retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), error.messageWithFallback()));
+                                    future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 future.completeExceptionally(error.exception());
@@ -1632,13 +1669,20 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a result for topic " + topic);
                 } else {
-                    final Call call = getDeleteTopicsCall(options, futures, retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getDeleteTopicsCall(options, futures, retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded exception, we propagate
+                // the initial error back to the caller if the request timed out.
+                maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
@@ -2478,7 +2522,7 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public CreatePartitionsResult createPartitions(final Map<String, NewPartitions> newPartitions,
-        final CreatePartitionsOptions options) {
+                                                   final CreatePartitionsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
         final CreatePartitionsTopicCollection topics = new CreatePartitionsTopicCollection(newPartitions.size());
         for (Map.Entry<String, NewPartitions> entry : newPartitions.entrySet()) {
@@ -2498,16 +2542,19 @@ public class KafkaAdminClient extends AdminClient {
         if (!topics.isEmpty()) {
             final long now = time.milliseconds();
             final long deadline = calcDeadlineMs(now, options.timeoutMs());
-            final Call call = getCreatePartitionsCall(options, futures, topics, deadline);
+            final Call call = getCreatePartitionsCall(options, futures, topics,
+                Collections.emptyMap(), now, deadline);
             runnable.call(call, now);
         }
         return new CreatePartitionsResult(new HashMap<>(futures));
     }
 
     private Call getCreatePartitionsCall(final CreatePartitionsOptions options,
-        final Map<String, KafkaFutureImpl<Void>> futures,
-        final CreatePartitionsTopicCollection topics,
-        final long deadline) {
+                                         final Map<String, KafkaFutureImpl<Void>> futures,
+                                         final CreatePartitionsTopicCollection topics,
+                                         final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
+                                         final long now,
+                                         final long deadline) {
         return new Call("createPartitions", deadline, new ControllerNodeProvider()) {
             @Override
             public CreatePartitionsRequest.Builder createRequest(int timeoutMs) {
@@ -2525,6 +2572,7 @@ public class KafkaAdminClient extends AdminClient {
                 // Handle server responses for particular topics.
                 final CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
                 final CreatePartitionsTopicCollection retryTopics = new CreatePartitionsTopicCollection();
+                final Map<String, ThrottlingQuotaExceededException> retryTopicQuotaExceededExceptions = new HashMap<>();
                 for (CreatePartitionsTopicResult result : response.data().results()) {
                     KafkaFutureImpl<Void> future = futures.get(result.name());
                     if (future == null) {
@@ -2533,11 +2581,13 @@ public class KafkaAdminClient extends AdminClient {
                         ApiError error = new ApiError(result.errorCode(), result.errorMessage());
                         if (error.isFailure()) {
                             if (error.is(Errors.THROTTLING_QUOTA_EXCEEDED)) {
+                                ThrottlingQuotaExceededException quotaExceededException = new ThrottlingQuotaExceededException(
+                                    response.throttleTimeMs(), error.messageWithFallback());
                                 if (options.shouldRetryOnQuotaViolation()) {
                                     retryTopics.add(topics.find(result.name()).duplicate());
+                                    retryTopicQuotaExceededExceptions.put(result.name(), quotaExceededException);
                                 } else {
-                                    future.completeExceptionally(new ThrottlingQuotaExceededException(
-                                        response.throttleTimeMs(), error.messageWithFallback()));
+                                    future.completeExceptionally(quotaExceededException);
                                 }
                             } else {
                                 future.completeExceptionally(error.exception());
@@ -2553,13 +2603,20 @@ public class KafkaAdminClient extends AdminClient {
                     completeUnrealizedFutures(futures.entrySet().stream(),
                         topic -> "The controller response did not contain a result for topic " + topic);
                 } else {
-                    final Call call = getCreatePartitionsCall(options, futures, retryTopics, deadline);
-                    runnable.call(call, time.milliseconds());
+                    final long now = time.milliseconds();
+                    final Call call = getCreatePartitionsCall(options, futures, retryTopics,
+                        retryTopicQuotaExceededExceptions, now, deadline);
+                    runnable.call(call, now);
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
+                // If there were any topics retries due to a quota exceeded exception, we propagate
+                // the initial error back to the caller if the request timed out.
+                maybeCompleteQuotaExceededException(options.shouldRetryOnQuotaViolation(),
+                    throwable, futures, quotaExceededExceptions, (int) (time.milliseconds() - now));
+                // Fail all the other remaining futures
                 completeAllExceptionally(futures.values(), throwable);
             }
         };
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 80d5294..a4ae6f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -733,7 +733,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
+            ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
         }
     }
@@ -895,7 +897,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
+            ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
         }
     }
@@ -1727,7 +1731,9 @@ public class KafkaAdminClientTest {
             time.sleep(defaultApiTimeout + 1);
 
             assertNull(result.values().get("topic1").get());
-            TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class);
+            ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"),
+                ThrottlingQuotaExceededException.class);
+            assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class);
         }
     }