You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/04/03 00:09:54 UTC
[kafka] branch trunk updated: KAFKA-8183: Add retries to
WorkerUtils#verifyTopics (#6532)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4825bc4 KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)
4825bc4 is described below
commit 4825bc47a0341076009bdcb15d9a195f206a1f32
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Tue Apr 2 17:09:40 2019 -0700
KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../apache/kafka/trogdor/common/WorkerUtils.java | 34 ++++++++++++++++++----
1 file changed, 28 insertions(+), 6 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index ad54c06..adce304 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
@@ -160,7 +161,7 @@ public final class WorkerUtils {
log.warn("Topic(s) {} already exist.", topicsExists);
throw new TopicExistsException("One or more topics already exist.");
} else {
- verifyTopics(log, adminClient, topicsExists, topics);
+ verifyTopics(log, adminClient, topicsExists, topics, 3, 2500);
}
}
}
@@ -240,17 +241,20 @@ public final class WorkerUtils {
* @param topicsToVerify List of topics to verify
* @param topicsInfo Map of topic name to topic description, which includes topics in
* 'topicsToVerify' list.
+ * @param retryCount The number of times to retry the fetching of the topics
+ * @param retryBackoffMs The amount of time, in milliseconds, to wait in between retries
* @throws UnknownTopicOrPartitionException If at least one topic contained in 'topicsInfo'
- * does not exist
+ * does not exist after retrying.
* @throws RuntimeException If one or more topics have different number of partitions than
* described in 'topicsInfo'
*/
private static void verifyTopics(
Logger log, AdminClient adminClient,
- Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
- DescribeTopicsResult topicsResult = adminClient.describeTopics(
- topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
- Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
+ Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo, int retryCount, long retryBackoffMs) throws Throwable {
+
+ Map<String, TopicDescription> topicDescriptionMap = topicDescriptions(topicsToVerify, adminClient,
+ retryCount, retryBackoffMs);
+
for (TopicDescription desc: topicDescriptionMap.values()) {
// map will always contain the topic since all topics in 'topicsExists' are in given
// 'topics' map
@@ -265,6 +269,24 @@ public final class WorkerUtils {
}
}
+ private static Map<String, TopicDescription> topicDescriptions(Collection<String> topicsToVerify,
+ AdminClient adminClient,
+ int retryCount, long retryBackoffMs)
+ throws ExecutionException, InterruptedException {
+ UnknownTopicOrPartitionException lastException = null;
+ for (int i = 0; i < retryCount; i++) {
+ try {
+ DescribeTopicsResult topicsResult = adminClient.describeTopics(
+ topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+ return topicsResult.all().get();
+ } catch (UnknownTopicOrPartitionException exception) {
+ lastException = exception;
+ Thread.sleep(retryBackoffMs);
+ }
+ }
+ throw lastException;
+ }
+
/**
* Returns list of existing, not internal, topics/partitions that match given pattern and
* where partitions are in range [startPartition, endPartition]