You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/20 13:51:55 UTC

[kafka] branch trunk updated: Trogdor's ProducerBench does not fail if topics exists (#4673)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c24295  Trogdor's ProducerBench does not fail if topics exists (#4673)
5c24295 is described below

commit 5c24295d44b4f015a620914992542c4d2083c611
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Mar 20 06:51:45 2018 -0700

    Trogdor's ProducerBench does not fail if topics exists (#4673)
    
    Added configs to ProducerBenchSpec:
    topicPrefix: name of topics will be of format topicPrefix + topic index. If not provided, default is "produceBenchTopic".
    partitionsPerTopic: number of partitions per topic. If not provided, default is 1.
    replicationFactor: replication factor per topic. If not provided, default is 3.
    
    The behavior of producer bench is changed such that if some or all topics already exist (with topic names = topicPrefix + topic index), and they have the same number of partitions as requested, the worker uses those topics and does not fail. The producer bench fails if one or more existing topics has number of partitions that is different from expected number of partitions.
    
    Added unit test for WorkerUtils -- for existing methods and new methods.
    
    Fixed bug in MockAdminClient, where createTopics() would over-write existing topic's replication factor and number of partitions while correctly completing the appropriate futures exceptionally with TopicExistsException.
    
    Reviewers: Colin P. Mccabe <cm...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 checkstyle/suppressions.xml                        |   2 +
 .../kafka/clients/admin/MockAdminClient.java       |   1 +
 .../apache/kafka/trogdor/common/WorkerUtils.java   | 194 ++++++++++++++-----
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  33 +++-
 .../kafka/trogdor/workload/ProduceBenchWorker.java |  21 +-
 .../kafka/trogdor/workload/RoundTripWorker.java    |   7 +-
 .../trogdor/common/JsonSerializationTest.java      |   2 +-
 .../kafka/trogdor/common/WorkerUtilsTest.java      | 211 +++++++++++++++++++++
 8 files changed, 405 insertions(+), 66 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 62fe4ed..e3bf151 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -212,6 +212,8 @@
               files="SignalLogger.java"/>
     <suppress checks="IllegalImport"
               files="SignalLogger.java"/>
+    <suppress checks="ParameterNumber"
+              files="ProduceBenchSpec.java"/>
 
     <!-- Log4J-Appender -->
     <suppress checks="CyclomaticComplexity"
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c950163..c141a8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -152,6 +152,7 @@ public class MockAdminClient extends AdminClient {
             if (allTopics.containsKey(topicName)) {
                 future.completeExceptionally(new TopicExistsException(String.format("Topic %s exists already.", topicName)));
                 createTopicResult.put(topicName, future);
+                continue;
             }
             int replicationFactor = newTopic.replicationFactor();
             List<Node> replicas = new ArrayList<>(replicationFactor);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 58f8278..99c13c0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -19,9 +19,14 @@ package org.apache.kafka.trogdor.common;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -33,7 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 /**
@@ -71,7 +75,7 @@ public final class WorkerUtils {
     }
 
     private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
-    private static final int CREATE_TOPICS_CALL_TIMEOUT = 90000;
+    private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
     private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
 
             //Map<String, Map<Integer, List<Integer>>> topics) throws Throwable {
@@ -82,61 +86,151 @@ public final class WorkerUtils {
      * @param log               The logger to use.
      * @param bootstrapServers  The bootstrap server list.
      * @param topics            Maps topic names to partition assignments.
+     * @param failOnExisting    If true, the method will throw TopicExistsException if one or
+     *                          more topics already exist. Otherwise, the existing topics are
+     *                          verified for number of partitions. In this case, if number of
+     *                          partitions of an existing topic does not match the requested
+     *                          number of partitions, the method throws RuntimeException.
      */
-    public static void createTopics(Logger log, String bootstrapServers,
-            Collection<NewTopic> topics) throws Throwable {
-        Properties props = new Properties();
-        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
-        try (AdminClient adminClient = AdminClient.create(props)) {
-            long startMs = Time.SYSTEM.milliseconds();
-            int tries = 0;
+    public static void createTopics(
+        Logger log, String bootstrapServers,
+        Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
+        // this method wraps the call to createTopics() that takes admin client, so that we can
+        // unit test the functionality with MockAdminClient. The exception is caught and
+        // re-thrown so that admin client is closed when the method returns.
+        try (AdminClient adminClient = createAdminClient(bootstrapServers)) {
+            createTopics(log, adminClient, topics, failOnExisting);
+        } catch (Exception e) {
+            log.warn("Failed to create or verify topics {}", topics, e);
+            throw e;
+        }
+    }
+
+    /**
+     * The actual create topics functionality is separated into this method and called from the
+     * above method to be able to unit test with mock adminClient.
+     */
+    static void createTopics(
+        Logger log, AdminClient adminClient,
+        Map<String, NewTopic> topics, boolean failOnExisting) throws Throwable {
+        if (topics.isEmpty()) {
+            log.warn("Request to create topics has an empty topic list.");
+            return;
+        }
 
-            Map<String, NewTopic> newTopics = new HashMap<>();
-            for (NewTopic newTopic : topics) {
-                newTopics.put(newTopic.name(), newTopic);
+        Collection<String> topicsExists = createTopics(log, adminClient, topics.values());
+        if (!topicsExists.isEmpty()) {
+            if (failOnExisting) {
+                log.warn("Topic(s) {} already exist.", topicsExists);
+                throw new TopicExistsException("One or more topics already exist.");
+            } else {
+                verifyTopics(log, adminClient, topicsExists, topics);
             }
-            List<String> topicsToCreate = new ArrayList<>(newTopics.keySet());
-            while (true) {
-                log.info("Attemping to create {} topics (try {})...", topicsToCreate.size(), ++tries);
-                Map<String, Future<Void>> creations = new HashMap<>();
-                while (!topicsToCreate.isEmpty()) {
-                    List<NewTopic> newTopicsBatch = new ArrayList<>();
-                    for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
-                            !topicsToCreate.isEmpty(); i++) {
-                        String topicName = topicsToCreate.remove(0);
-                        newTopicsBatch.add(newTopics.get(topicName));
-                    }
-                    creations.putAll(adminClient.createTopics(newTopicsBatch).values());
+        }
+    }
+
+    /**
+     * Creates Kafka topics and returns a list of topics that already exist
+     * @param log             The logger to use
+     * @param adminClient     AdminClient
+     * @param topics          List of topics to create
+     * @return                Collection of topics names that already exist.
+     * @throws Throwable if creation of one or more topics fails (except for topic exists case).
+     */
+    private static Collection<String> createTopics(Logger log, AdminClient adminClient,
+                                                   Collection<NewTopic> topics) throws Throwable {
+        long startMs = Time.SYSTEM.milliseconds();
+        int tries = 0;
+        List<String> existingTopics = new ArrayList<>();
+
+        Map<String, NewTopic> newTopics = new HashMap<>();
+        for (NewTopic newTopic : topics) {
+            newTopics.put(newTopic.name(), newTopic);
+        }
+        List<String> topicsToCreate = new ArrayList<>(newTopics.keySet());
+        while (true) {
+            log.info("Attempting to create {} topics (try {})...", topicsToCreate.size(), ++tries);
+            Map<String, Future<Void>> creations = new HashMap<>();
+            while (!topicsToCreate.isEmpty()) {
+                List<NewTopic> newTopicsBatch = new ArrayList<>();
+                for (int i = 0; (i < MAX_CREATE_TOPICS_BATCH_SIZE) &&
+                                !topicsToCreate.isEmpty(); i++) {
+                    String topicName = topicsToCreate.remove(0);
+                    newTopicsBatch.add(newTopics.get(topicName));
                 }
-                // We retry cases where the topic creation failed with a
-                // timeout.  This is a workaround for KAFKA-6368.
-                for (Map.Entry<String, Future<Void>> entry : creations.entrySet()) {
-                    String topicName = entry.getKey();
-                    Future<Void> future = entry.getValue();
-                    try {
-                        future.get();
-                        log.debug("Successfully created {}.", topicName);
-                    } catch (ExecutionException e) {
-                        if (e.getCause() instanceof TimeoutException) {
-                            log.warn("Timed out attempting to create {}: {}", topicName, e.getCause().getMessage());
-                            topicsToCreate.add(topicName);
-                        } else {
-                            log.warn("Failed to create {}", topicName, e.getCause());
-                            throw e.getCause();
-                        }
+                creations.putAll(adminClient.createTopics(newTopicsBatch).values());
+            }
+            // We retry cases where the topic creation failed with a
+            // timeout.  This is a workaround for KAFKA-6368.
+            for (Map.Entry<String, Future<Void>> entry : creations.entrySet()) {
+                String topicName = entry.getKey();
+                Future<Void> future = entry.getValue();
+                try {
+                    future.get();
+                    log.debug("Successfully created {}.", topicName);
+                } catch (Exception e) {
+                    if ((e.getCause() instanceof TimeoutException)
+                        || (e.getCause() instanceof NotEnoughReplicasException)) {
+                        log.warn("Attempt to create topic `{}` failed: {}", topicName,
+                                 e.getCause().getMessage());
+                        topicsToCreate.add(topicName);
+                    } else if (e.getCause() instanceof TopicExistsException) {
+                        log.info("Topic {} already exists.", topicName);
+                        existingTopics.add(topicName);
+                    } else {
+                        log.warn("Failed to create {}", topicName, e.getCause());
+                        throw e.getCause();
                     }
                 }
-                if (topicsToCreate.isEmpty()) {
-                    break;
-                }
-                if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
-                    String str = "Unable to create topic(s): " +
-                            Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
-                    log.warn(str);
-                    throw new TimeoutException(str);
-                }
+            }
+            if (topicsToCreate.isEmpty()) {
+                break;
+            }
+            if (Time.SYSTEM.milliseconds() > startMs + CREATE_TOPICS_CALL_TIMEOUT) {
+                String str = "Unable to create topic(s): " +
+                             Utils.join(topicsToCreate, ", ") + "after " + tries + " attempt(s)";
+                log.warn(str);
+                throw new TimeoutException(str);
+            }
+        }
+        return existingTopics;
+    }
+
+    /**
+     * Verifies that topics in 'topicsToVerify' list have the same number of partitions as
+     * described in 'topicsInfo'
+     * @param log                The logger to use
+     * @param adminClient        AdminClient
+     * @param topicsToVerify     List of topics to verify
+     * @param topicsInfo         Map of topic name to topic description, which includes topics in
+     *                           'topicsToVerify' list.
+     * @throws RuntimeException  If one or more topics have different number of partitions than
+     * described in 'topicsInfo'
+     */
+    private static void verifyTopics(
+        Logger log, AdminClient adminClient,
+        Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
+        DescribeTopicsResult topicsResult = adminClient.describeTopics(
+            topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
+        Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
+        for (TopicDescription desc: topicDescriptionMap.values()) {
+            // map will always contain the topic since all topics in 'topicsExists' are in given
+            // 'topics' map
+            int partitions = topicsInfo.get(desc.name()).numPartitions();
+            if (desc.partitions().size() != partitions) {
+                String str = "Topic '" + desc.name() + "' exists, but has "
+                             + desc.partitions().size() + " partitions, while requested "
+                             + " number of partitions is " + partitions;
+                log.warn(str);
+                throw new RuntimeException(str);
             }
         }
     }
+
+    private static AdminClient createAdminClient(String bootstrapServers) {
+        Properties props = new Properties();
+        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+        return AdminClient.create(props);
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index a798e73..7b1bedd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -33,6 +33,11 @@ import java.util.Set;
  * The specification for a benchmark that produces messages to a set of topics.
  */
 public class ProduceBenchSpec extends TaskSpec {
+
+    private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
+    private static final int DEFAULT_NUM_PARTITIONS = 1;
+    private static final short DEFAULT_REPLICATION_FACTOR = 3;
+
     private final String producerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -42,6 +47,9 @@ public class ProduceBenchSpec extends TaskSpec {
     private final Map<String, String> producerConf;
     private final int totalTopics;
     private final int activeTopics;
+    private final String topicPrefix;
+    private final int numPartitions;
+    private final short replicationFactor;
 
     @JsonCreator
     public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@@ -54,7 +62,10 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
                          @JsonProperty("producerConf") Map<String, String> producerConf,
                          @JsonProperty("totalTopics") int totalTopics,
-                         @JsonProperty("activeTopics") int activeTopics) {
+                         @JsonProperty("activeTopics") int activeTopics,
+                         @JsonProperty("topicPrefix") String topicPrefix,
+                         @JsonProperty("partitionsPerTopic") int partitionsPerTopic,
+                         @JsonProperty("replicationFactor") short replicationFactor) {
         super(startMs, durationMs);
         this.producerNode = (producerNode == null) ? "" : producerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
@@ -67,6 +78,11 @@ public class ProduceBenchSpec extends TaskSpec {
         this.producerConf = (producerConf == null) ? new TreeMap<String, String>() : producerConf;
         this.totalTopics = totalTopics;
         this.activeTopics = activeTopics;
+        this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix;
+        this.numPartitions = (partitionsPerTopic == 0)
+                             ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
+        this.replicationFactor = (replicationFactor == 0)
+                                 ? DEFAULT_REPLICATION_FACTOR : replicationFactor;
     }
 
     @JsonProperty
@@ -114,6 +130,21 @@ public class ProduceBenchSpec extends TaskSpec {
         return activeTopics;
     }
 
+    @JsonProperty
+    public String topicPrefix() {
+        return topicPrefix;
+    }
+
+    @JsonProperty
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    @JsonProperty
+    public short replicationFactor() {
+        return replicationFactor;
+    }
+
     @Override
     public TaskController newController(String id) {
         return new TaskController() {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 51f52d3..e291bae 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -37,8 +37,7 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
@@ -51,11 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
-
-    private static final short NUM_PARTITIONS = 1;
-
-    private static final short REPLICATION_FACTOR = 3;
-
+    
     private static final int THROTTLE_PERIOD_MS = 100;
 
     private final String id;
@@ -76,8 +71,8 @@ public class ProduceBenchWorker implements TaskWorker {
      * @param topicIndex        The topic number.
      * @return                  The topic name.
      */
-    public static String topicIndexToName(int topicIndex) {
-        return String.format("topic%05d", topicIndex);
+    public String topicIndexToName(int topicIndex) {
+        return String.format("%s%05d", spec.topicPrefix(), topicIndex);
     }
 
     public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
@@ -111,11 +106,13 @@ public class ProduceBenchWorker implements TaskWorker {
                         "activeTopics was %d, but totalTopics was only %d.  activeTopics must " +
                             "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
                 }
-                List<NewTopic> newTopics = new ArrayList<>();
+                Map<String, NewTopic> newTopics = new HashMap<>();
                 for (int i = 0; i < spec.totalTopics(); i++) {
-                    newTopics.add(new NewTopic(topicIndexToName(i), NUM_PARTITIONS, REPLICATION_FACTOR));
+                    String name = topicIndexToName(i);
+                    newTopics.put(name, new NewTopic(name, spec.numPartitions(), spec.replicationFactor()));
                 }
-                WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics);
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), newTopics, false);
+
                 executor.submit(new SendRecords());
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 1b9cb8f..a05785c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -123,8 +123,11 @@ public class RoundTripWorker implements TaskWorker {
                 if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) {
                     throw new ConfigException("Invalid null or empty partitionAssignments.");
                 }
-                WorkerUtils.createTopics(log, spec.bootstrapServers(),
-                    Collections.singletonList(new NewTopic(TOPIC_NAME, spec.partitionAssignments())));
+                WorkerUtils.createTopics(
+                    log, spec.bootstrapServers(),
+                    Collections.singletonMap(TOPIC_NAME,
+                                             new NewTopic(TOPIC_NAME, spec.partitionAssignments())),
+                    true);
                 executor.submit(new ProducerRunnable());
                 executor.submit(new ConsumerRunnable());
             } catch (Throwable e) {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 77a7932..dee7614 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -49,7 +49,7 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, 0, null));
         verify(new WorkerStopping(null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, 0, 0));
+            0, 0, null, null, null, 0, 0, "test-topic", 1, (short) 3));
         verify(new RoundTripWorkloadSpec(0, 0, null, null,
             0, null, null, 0));
         verify(new SampleTaskSpec(0, 0, 0, null));
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
new file mode 100644
index 0000000..22b7846
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+
+
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.clients.admin.MockAdminClient;
+
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class WorkerUtilsTest {
+
+    private static final Logger log = LoggerFactory.getLogger(WorkerUtilsTest.class);
+
+    private final Node broker1 = new Node(0, "testHost-1", 1234);
+    private final Node broker2 = new Node(1, "testHost-2", 1234);
+    private final Node broker3 = new Node(1, "testHost-3", 1234);
+    private final List<Node> cluster = Arrays.asList(broker1, broker2, broker3);
+    private final List<Node> singleReplica = Collections.singletonList(broker1);
+
+    private static final String TEST_TOPIC = "test-topic-1";
+    private static final short TEST_REPLICATION_FACTOR = 1;
+    private static final int TEST_PARTITIONS = 1;
+    private static final NewTopic NEW_TEST_TOPIC =
+        new NewTopic(TEST_TOPIC, TEST_PARTITIONS, TEST_REPLICATION_FACTOR);
+
+    private MockAdminClient adminClient;
+
+
+    @Before
+    public void setUp() throws Exception {
+        adminClient = new MockAdminClient(cluster, broker1);
+    }
+
+    @Test
+    public void testCreateOneTopic() throws Throwable {
+        Map<String, NewTopic> newTopics = Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC);
+
+        WorkerUtils.createTopics(log, adminClient, newTopics, true);
+        assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
+        assertEquals(
+            new TopicDescription(
+                TEST_TOPIC, false,
+                Collections.singletonList(
+                    new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
+            adminClient.describeTopics(
+                Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+        );
+    }
+
+    @Test
+    public void testCreateRetriesOnTimeout() throws Throwable {
+        adminClient.timeoutNextRequest(1);
+
+        WorkerUtils.createTopics(
+            log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), true);
+
+        assertEquals(
+            new TopicDescription(
+                TEST_TOPIC, false,
+                Collections.singletonList(
+                    new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
+            adminClient.describeTopics(
+                Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+        );
+    }
+
+    @Test
+    public void testCreateZeroTopicsDoesNothing() throws Throwable {
+        WorkerUtils.createTopics(log, adminClient, Collections.<String, NewTopic>emptyMap(), true);
+        assertEquals(0, adminClient.listTopics().names().get().size());
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testCreateTopicsFailsIfAtLeastOneTopicExists() throws Throwable {
+        adminClient.addTopic(
+            false,
+            TEST_TOPIC,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
+            null);
+
+        Map<String, NewTopic> newTopics = new HashMap<>();
+        newTopics.put(TEST_TOPIC, NEW_TEST_TOPIC);
+        newTopics.put("another-topic",
+                      new NewTopic("another-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
+        newTopics.put("one-more-topic",
+                      new NewTopic("one-more-topic", TEST_PARTITIONS, TEST_REPLICATION_FACTOR));
+
+        WorkerUtils.createTopics(log, adminClient, newTopics, true);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testExistingTopicsMustHaveRequestedNumberOfPartitions() throws Throwable {
+        List<TopicPartitionInfo> tpInfo = new ArrayList<>();
+        tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+        tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
+        adminClient.addTopic(
+            false,
+            TEST_TOPIC,
+            tpInfo,
+            null);
+
+        WorkerUtils.createTopics(
+            log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
+    }
+
+    @Test
+    public void testExistingTopicsNotCreated() throws Throwable {
+        final String existingTopic = "existing-topic";
+        List<TopicPartitionInfo> tpInfo = new ArrayList<>();
+        tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+        tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
+        tpInfo.add(new TopicPartitionInfo(2, broker3, singleReplica, Collections.<Node>emptyList()));
+        adminClient.addTopic(
+            false,
+            existingTopic,
+            tpInfo,
+            null);
+
+        WorkerUtils.createTopics(
+            log, adminClient,
+            Collections.singletonMap(
+                existingTopic,
+                new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR)), false);
+
+        assertEquals(Collections.singleton(existingTopic), adminClient.listTopics().names().get());
+    }
+
+    @Test
+    public void testCreatesNotExistingTopics() throws Throwable {
+        // should be no topics before the call
+        assertEquals(0, adminClient.listTopics().names().get().size());
+
+        WorkerUtils.createTopics(
+            log, adminClient, Collections.singletonMap(TEST_TOPIC, NEW_TEST_TOPIC), false);
+
+        assertEquals(Collections.singleton(TEST_TOPIC), adminClient.listTopics().names().get());
+        assertEquals(
+            new TopicDescription(
+                TEST_TOPIC, false,
+                Collections.singletonList(
+                    new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
+            adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+        );
+    }
+
+    @Test
+    public void testCreatesOneTopicVerifiesOneTopic() throws Throwable {
+        final String existingTopic = "existing-topic";
+        List<TopicPartitionInfo> tpInfo = new ArrayList<>();
+        tpInfo.add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
+        tpInfo.add(new TopicPartitionInfo(1, broker2, singleReplica, Collections.<Node>emptyList()));
+        adminClient.addTopic(
+            false,
+            existingTopic,
+            tpInfo,
+            null);
+
+        Map<String, NewTopic> topics = new HashMap<>();
+        topics.put(existingTopic,
+                   new NewTopic(existingTopic, tpInfo.size(), TEST_REPLICATION_FACTOR));
+        topics.put(TEST_TOPIC, NEW_TEST_TOPIC);
+
+        WorkerUtils.createTopics(log, adminClient, topics, false);
+
+        assertEquals(Utils.mkSet(existingTopic, TEST_TOPIC), adminClient.listTopics().names().get());
+    }
+
+    @Test
+    public void testCreateNonExistingTopicsWithZeroTopicsDoesNothing() throws Throwable {
+        WorkerUtils.createTopics(
+            log, adminClient, Collections.<String, NewTopic>emptyMap(), false);
+        assertEquals(0, adminClient.listTopics().names().get().size());
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.