You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/04/03 00:09:54 UTC

[kafka] branch trunk updated: KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4825bc4  KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)
4825bc4 is described below

commit 4825bc47a0341076009bdcb15d9a195f206a1f32
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Tue Apr 2 17:09:40 2019 -0700

    KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../apache/kafka/trogdor/common/WorkerUtils.java   | 34 ++++++++++++++++++----
 1 file changed, 28 insertions(+), 6 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index ad54c06..adce304 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
@@ -160,7 +161,7 @@ public final class WorkerUtils {
                 log.warn("Topic(s) {} already exist.", topicsExists);
                 throw new TopicExistsException("One or more topics already exist.");
             } else {
-                verifyTopics(log, adminClient, topicsExists, topics);
+                verifyTopics(log, adminClient, topicsExists, topics, 3, 2500);
             }
         }
     }
@@ -240,17 +241,20 @@ public final class WorkerUtils {
      * @param topicsToVerify     List of topics to verify
      * @param topicsInfo         Map of topic name to topic description, which includes topics in
      *                           'topicsToVerify' list.
+     * @param retryCount         The number of times to retry the fetching of the topics
+     * @param retryBackoffMs     The amount of time, in milliseconds, to wait in between retries
      * @throws UnknownTopicOrPartitionException If at least one topic contained in 'topicsInfo'
-     * does not exist
+     * does not exist after retrying.
      * @throws RuntimeException  If one or more topics have different number of partitions than
      * described in 'topicsInfo'
      */
     private static void verifyTopics(
         Logger log, AdminClient adminClient,
-        Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
-        DescribeTopicsResult topicsResult = adminClient.describeTopics(
-            topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
-        Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
+        Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
+
+        Map<String, TopicDescription> topicDescriptionMap = topicDescriptions(topicsToVerify, adminClient,
+                retryCount, retryBackoffMs);
+
         for (TopicDescription desc: topicDescriptionMap.values()) {
             // map will always contain the topic since all topics in 'topicsExists' are in given
             // 'topics' map
@@ -265,6 +269,24 @@ public final class WorkerUtils {
         }
     }
 
+    private static Map<String, TopicDescription> topicDescriptions(Collection<String> topicsToVerify,
+                                                                   AdminClient adminClient,
+                                                                   int retryCount, long retryBackoffMs)
+            throws ExecutionException, InterruptedException {
+        UnknownTopicOrPartitionException lastException = null;
+        for (int i = 0; i < retryCount; i++) {
+            try {
+                DescribeTopicsResult topicsResult = adminClient.describeTopics(
+                        topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+                return topicsResult.all().get();
+            } catch (UnknownTopicOrPartitionException exception) {
+                lastException = exception;
+                Thread.sleep(retryBackoffMs);
+            }
+        }
+        throw lastException;
+    }
+
     /**
      * Returns list of existing, not internal, topics/partitions that match given pattern and
      * where partitions are in range [startPartition, endPartition]