You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/10 08:45:22 UTC

[kafka] branch trunk updated: Kafka-6693: Added consumer workload to Trogdor (#4775)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 989fe04   Kafka-6693: Added consumer workload to Trogdor (#4775)
989fe04 is described below

commit 989fe0497ec72ea3470a30a9ef47389b48ee5421
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Apr 10 01:45:08 2018 -0700

     Kafka-6693: Added consumer workload to Trogdor (#4775)
    
    Added consumer only workload to Trogdor. The topics must already be pre-populated. The spec lets the user request topic pattern and range of partitions to assign to [startPartition, endPartition].
    
    Reviewers: Colin P. Mccabe <cm...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/trogdor/common/WorkerUtils.java   |  83 +++++-
 .../kafka/trogdor/workload/ConsumeBenchSpec.java   | 139 ++++++++++
 .../kafka/trogdor/workload/ConsumeBenchWorker.java | 298 +++++++++++++++++++++
 .../kafka/trogdor/common/WorkerUtilsTest.java      |  58 +++-
 4 files changed, 572 insertions(+), 6 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 98dbf38..3d4871a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -21,9 +21,14 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
@@ -39,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Future;
+import java.util.regex.Pattern;
 
 /**
  * Utilities for Trogdor TaskWorkers.
@@ -91,7 +97,7 @@ public final class WorkerUtils {
         }
     }
 
-    private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
+    private static final int ADMIN_REQUEST_TIMEOUT = 25000;
     private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
     private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
 
@@ -129,6 +135,32 @@ public final class WorkerUtils {
     }
 
     /**
+     * Returns a list of all existing topic partitions  that match the following criteria: topic
+     * name matches give regular expression 'topicRegex', topic is not internal, partitions are
+     * in range [startPartition, endPartition]
+     *
+     * @param log                The logger to use.
+     * @param bootstrapServers   The bootstrap server list.
+     * @param topicRegex         Topic name regular expression
+     * @param startPartition     Starting partition of partition range
+     * @param endPartition       Ending partition of partition range
+     * @return List of topic partitions
+     * @throws Throwable If getting list of topics or their descriptions fails.
+     */
+    public static Collection<TopicPartition> getMatchingTopicPartitions(
+        Logger log, String bootstrapServers,
+        Map<String, String> commonClientConf, Map<String, String> adminClientConf,
+        String topicRegex, int startPartition, int endPartition) throws Throwable {
+        try (AdminClient adminClient
+                 = createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) {
+            return getMatchingTopicPartitions(adminClient, topicRegex, startPartition, endPartition);
+        } catch (Exception e) {
+            log.warn("Failed to get topic partitions matching {}", topicRegex, e);
+            throw e;
+        }
+    }
+
+    /**
      * The actual create topics functionality is separated into this method and called from the
      * above method to be able to unit test with mock adminClient.
      */
@@ -233,7 +265,7 @@ public final class WorkerUtils {
         Logger log, AdminClient adminClient,
         Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
         DescribeTopicsResult topicsResult = adminClient.describeTopics(
-            topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
+            topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
         Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
         for (TopicDescription desc: topicDescriptionMap.values()) {
             // map will always contain the topic since all topics in 'topicsExists' are in given
@@ -249,12 +281,53 @@ public final class WorkerUtils {
         }
     }
 
+    /**
+     * Returns list of existing, not internal, topics/partitions that match given pattern and
+     * where partitions are in range [startPartition, endPartition]
+     * @param adminClient     AdminClient
+     * @param topicRegex      Topic regular expression to match
+     * @return                List of topic names
+     * @throws Throwable      If failed to get list of existing topics
+     */
+    static Collection<TopicPartition> getMatchingTopicPartitions(
+        AdminClient adminClient, String topicRegex, int startPartition, int endPartition)
+        throws Throwable {
+        final Pattern topicNamePattern = Pattern.compile(topicRegex);
+
+        // first get list of matching topics
+        List<String> matchedTopics = new ArrayList<>();
+        ListTopicsResult res = adminClient.listTopics(
+            new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+        Map<String, TopicListing> topicListingMap = res.namesToListings().get();
+        for (Map.Entry<String, TopicListing> topicListingEntry: topicListingMap.entrySet()) {
+            if (!topicListingEntry.getValue().isInternal()
+                && topicNamePattern.matcher(topicListingEntry.getKey()).matches()) {
+                matchedTopics.add(topicListingEntry.getKey());
+            }
+        }
+
+        // create a list of topic/partitions
+        List<TopicPartition> out = new ArrayList<>();
+        DescribeTopicsResult topicsResult = adminClient.describeTopics(
+            matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+        Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
+        for (TopicDescription desc: topicDescriptionMap.values()) {
+            List<TopicPartitionInfo> partitions = desc.partitions();
+            for (TopicPartitionInfo info: partitions) {
+                if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) {
+                    out.add(new TopicPartition(desc.name(), info.partition()));
+                }
+            }
+        }
+        return out;
+    }
+
     private static AdminClient createAdminClient(
-        String bootstrapServers, Map<String, String> commonClientConf,
-        Map<String, String> adminClientConf) {
+        String bootstrapServers,
+        Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
         Properties props = new Properties();
         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT);
         // first add common client config, and then admin client config to properties, possibly
         // over-writing default or common properties.
         addConfigsToProperties(props, commonClientConf, adminClientConf);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
new file mode 100644
index 0000000..cef913b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The specification for a benchmark that produces messages to a set of topics.
+ */
+public class ConsumeBenchSpec extends TaskSpec {
+
+    private final String consumerNode;
+    private final String bootstrapServers;
+    private final int targetMessagesPerSec;
+    private final int maxMessages;
+    private final Map<String, String> consumerConf;
+    private final Map<String, String> adminClientConf;
+    private final Map<String, String> commonClientConf;
+    private final String topicRegex;
+    private final int startPartition;
+    private final int endPartition;
+
+
+    @JsonCreator
+    public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
+                            @JsonProperty("durationMs") long durationMs,
+                            @JsonProperty("consumerNode") String consumerNode,
+                            @JsonProperty("bootstrapServers") String bootstrapServers,
+                            @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+                            @JsonProperty("maxMessages") int maxMessages,
+                            @JsonProperty("consumerConf") Map<String, String> consumerConf,
+                            @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
+                            @JsonProperty("adminClientConf") Map<String, String> adminClientConf,
+                            @JsonProperty("topicRegex") String topicRegex,
+                            @JsonProperty("startPartition") int startPartition,
+                            @JsonProperty("endPartition") int endPartition) {
+        super(startMs, durationMs);
+        this.consumerNode = (consumerNode == null) ? "" : consumerNode;
+        this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
+        this.targetMessagesPerSec = targetMessagesPerSec;
+        this.maxMessages = maxMessages;
+        this.consumerConf = configOrEmptyMap(consumerConf);
+        this.commonClientConf = configOrEmptyMap(commonClientConf);
+        this.adminClientConf = configOrEmptyMap(adminClientConf);
+        this.topicRegex = topicRegex;
+        this.startPartition = startPartition;
+        this.endPartition = endPartition;
+    }
+
+    @JsonProperty
+    public String consumerNode() {
+        return consumerNode;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public int targetMessagesPerSec() {
+        return targetMessagesPerSec;
+    }
+
+    @JsonProperty
+    public int maxMessages() {
+        return maxMessages;
+    }
+
+    @JsonProperty
+    public Map<String, String> consumerConf() {
+        return consumerConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> commonClientConf() {
+        return commonClientConf;
+    }
+
+    @JsonProperty
+    public Map<String, String> adminClientConf() {
+        return adminClientConf;
+    }
+
+    @JsonProperty
+    public String topicRegex() {
+        return topicRegex;
+    }
+
+    @JsonProperty
+    public int startPartition() {
+        return startPartition;
+    }
+
+    @JsonProperty
+    public int endPartition() {
+        return endPartition;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new TaskController() {
+            @Override
+            public Set<String> targetNodes(Topology topology) {
+                return Collections.singleton(consumerNode);
+            }
+        };
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ConsumeBenchWorker(id, this);
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
new file mode 100644
index 0000000..5c74d90
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ConsumeBenchWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
+
+    private static final int THROTTLE_PERIOD_MS = 100;
+
+    private final String id;
+    private final ConsumeBenchSpec spec;
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private ScheduledExecutorService executor;
+    private WorkerStatusTracker status;
+    private KafkaFutureImpl<String> doneFuture;
+    private KafkaConsumer<byte[], byte[]> consumer;
+
+    public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, WorkerStatusTracker status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ConsumeBenchWorker is already running.");
+        }
+        log.info("{}: Activating ConsumeBenchWorker with {}", id, spec);
+        this.executor = Executors.newScheduledThreadPool(
+            2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
+        this.status = status;
+        this.doneFuture = doneFuture;
+        executor.submit(new Prepare());
+    }
+
+    public class Prepare implements Runnable {
+        @Override
+        public void run() {
+            try {
+                // find topics to consume from based on provided topic regular expression
+                if (spec.topicRegex() == null) {
+                    throw new ConfigException(
+                        "Must provide topic name or regular expression to match existing topics.");
+                }
+                Collection<TopicPartition> topicPartitions =
+                    WorkerUtils.getMatchingTopicPartitions(
+                        log, spec.bootstrapServers(),
+                        spec.commonClientConf(), spec.adminClientConf(),
+                        spec.topicRegex(), spec.startPartition(), spec.endPartition());
+                log.info("Will consume from {}", topicPartitions);
+
+                executor.submit(new ConsumeMessages(topicPartitions));
+            } catch (Throwable e) {
+                WorkerUtils.abort(log, "Prepare", e, doneFuture);
+            }
+        }
+    }
+
+    public class ConsumeMessages implements Callable<Void> {
+        private final Histogram latencyHistogram;
+        private final Histogram messageSizeHistogram;
+        private final Future<?> statusUpdaterFuture;
+        private final Throttle throttle;
+
+        ConsumeMessages(Collection<TopicPartition> topicPartitions) {
+            this.latencyHistogram = new Histogram(5000);
+            this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
+            this.statusUpdaterFuture = executor.scheduleAtFixedRate(
+                new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
+            Properties props = new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            // these defaults maybe over-written by the user-specified commonClientConf or
+            // consumerConf
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
+            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
+                                           new ByteArrayDeserializer());
+            consumer.assign(topicPartitions);
+            int perPeriod = WorkerUtils.perSecToPerPeriod(
+                spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+            this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+        }
+
+        @Override
+        public Void call() throws Exception {
+            long messagesConsumed = 0;
+            long bytesConsumed = 0;
+            long startTimeMs = Time.SYSTEM.milliseconds();
+            long startBatchMs = startTimeMs;
+            try {
+                while (messagesConsumed < spec.maxMessages()) {
+                    ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+                    if (records.isEmpty()) {
+                        continue;
+                    }
+                    long endBatchMs = Time.SYSTEM.milliseconds();
+                    long elapsedBatchMs = endBatchMs - startBatchMs;
+                    for (ConsumerRecord<byte[], byte[]> record : records) {
+                        messagesConsumed++;
+                        long messageBytes = 0;
+                        if (record.key() != null) {
+                            messageBytes += record.serializedKeySize();
+                        }
+                        if (record.value() != null) {
+                            messageBytes += record.serializedValueSize();
+                        }
+                        latencyHistogram.add(elapsedBatchMs);
+                        messageSizeHistogram.add(messageBytes);
+                        bytesConsumed += messageBytes;
+                        throttle.increment();
+                    }
+                    startBatchMs = Time.SYSTEM.milliseconds();
+                }
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture);
+            } finally {
+                statusUpdaterFuture.cancel(false);
+                StatusData statusData =
+                    new StatusUpdater(latencyHistogram, messageSizeHistogram).update();
+                long curTimeMs = Time.SYSTEM.milliseconds();
+                log.info("Consumed total number of messages={}, bytes={} in {} ms.  status: {}",
+                         messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData);
+            }
+            doneFuture.complete("");
+            return null;
+        }
+    }
+
+    public class StatusUpdater implements Runnable {
+        private final Histogram latencyHistogram;
+        private final Histogram messageSizeHistogram;
+
+        StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) {
+            this.latencyHistogram = latencyHistogram;
+            this.messageSizeHistogram = messageSizeHistogram;
+        }
+
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+            }
+        }
+
+        StatusData update() {
+            Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES);
+            Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES);
+            StatusData statusData = new StatusData(
+                latSummary.numSamples(),
+                (long) (msgSummary.numSamples() * msgSummary.average()),
+                (long) msgSummary.average(),
+                latSummary.average(),
+                latSummary.percentiles().get(0).value(),
+                latSummary.percentiles().get(1).value(),
+                latSummary.percentiles().get(2).value());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            log.info("Status={}", JsonUtil.toJsonString(statusData));
+            return statusData;
+        }
+    }
+
+    public static class StatusData {
+        private final long totalMessagesReceived;
+        private final long totalBytesReceived;
+        private final long averageMessageSizeBytes;
+        private final float averageLatencyMs;
+        private final int p50LatencyMs;
+        private final int p95LatencyMs;
+        private final int p99LatencyMs;
+
+        /**
+         * The percentiles to use when calculating the histogram data.
+         * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
+         */
+        final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+        @JsonCreator
+        StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived,
+                   @JsonProperty("totalBytesReceived") long totalBytesReceived,
+                   @JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes,
+                   @JsonProperty("averageLatencyMs") float averageLatencyMs,
+                   @JsonProperty("p50LatencyMs") int p50latencyMs,
+                   @JsonProperty("p95LatencyMs") int p95latencyMs,
+                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+            this.totalMessagesReceived = totalMessagesReceived;
+            this.totalBytesReceived = totalBytesReceived;
+            this.averageMessageSizeBytes = averageMessageSizeBytes;
+            this.averageLatencyMs = averageLatencyMs;
+            this.p50LatencyMs = p50latencyMs;
+            this.p95LatencyMs = p95latencyMs;
+            this.p99LatencyMs = p99latencyMs;
+        }
+
+        @JsonProperty
+        public long totalMessagesReceived() {
+            return totalMessagesReceived;
+        }
+
+        @JsonProperty
+        public long totalBytesReceived() {
+            return totalBytesReceived;
+        }
+
+        @JsonProperty
+        public long averageMessageSizeBytes() {
+            return averageMessageSizeBytes;
+        }
+
+        @JsonProperty
+        public float averageLatencyMs() {
+            return averageLatencyMs;
+        }
+
+        @JsonProperty
+        public int p50LatencyMs() {
+            return p50LatencyMs;
+        }
+
+        @JsonProperty
+        public int p95LatencyMs() {
+            return p95LatencyMs;
+        }
+
+        @JsonProperty
+        public int p99LatencyMs() {
+            return p99LatencyMs;
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ConsumeBenchWorker is not running.");
+        }
+        log.info("{}: Deactivating ConsumeBenchWorker.", id);
+        doneFuture.complete("");
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.DAYS);
+        Utils.closeQuietly(consumer, "consumer");
+        this.consumer = null;
+        this.executor = null;
+        this.status = null;
+        this.doneFuture = null;
+    }
+
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index fbe2389..a35efe1 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -18,9 +18,9 @@
 package org.apache.kafka.trogdor.common;
 
 
-
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 
 import org.apache.kafka.common.Node;
@@ -38,8 +38,10 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -262,4 +264,58 @@ public class WorkerUtilsTest {
             Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0"));
         assertEquals(resultProps, props);
     }
+
+    @Test
+    public void testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName() throws Throwable {
+        final String topic1 = "existing-topic";
+        final String topic2 = "another-topic";
+        makeExistingTopicWithOneReplica(topic1, 10);
+        makeExistingTopicWithOneReplica(topic2, 20);
+
+        Collection<TopicPartition> topicPartitions =
+            WorkerUtils.getMatchingTopicPartitions(adminClient, topic2, 0, 2);
+        assertEquals(
+            Utils.mkSet(
+                new TopicPartition(topic2, 0), new TopicPartition(topic2, 1),
+                new TopicPartition(topic2, 2)
+            ),
+            new HashSet<>(topicPartitions)
+        );
+    }
+
+    @Test
+    public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable {
+        final String topic1 = "test-topic";
+        final String topic2 = "another-test-topic";
+        final String topic3 = "one-more";
+        makeExistingTopicWithOneReplica(topic1, 10);
+        makeExistingTopicWithOneReplica(topic2, 20);
+        makeExistingTopicWithOneReplica(topic3, 30);
+
+        Collection<TopicPartition> topicPartitions =
+            WorkerUtils.getMatchingTopicPartitions(adminClient, ".*-topic$", 0, 1);
+        assertEquals(
+            Utils.mkSet(
+                new TopicPartition(topic1, 0), new TopicPartition(topic1, 1),
+                new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)
+            ),
+            new HashSet<>(topicPartitions)
+        );
+    }
+
+    private void makeExistingTopicWithOneReplica(String topicName, int numPartitions) {
+        List<TopicPartitionInfo> tpInfo = new ArrayList<>();
+        int brokerIndex = 0;
+        for (int i = 0; i < numPartitions; ++i) {
+            Node broker = cluster.get(brokerIndex);
+            tpInfo.add(new TopicPartitionInfo(
+                i, broker, singleReplica, Collections.<Node>emptyList()));
+            brokerIndex = (brokerIndex + 1) % cluster.size();
+        }
+        adminClient.addTopic(
+            false,
+            topicName,
+            tpInfo,
+            null);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.