You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/10 08:45:22 UTC
[kafka] branch trunk updated: Kafka-6693: Added consumer workload
to Trogdor (#4775)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 989fe04 Kafka-6693: Added consumer workload to Trogdor (#4775)
989fe04 is described below
commit 989fe0497ec72ea3470a30a9ef47389b48ee5421
Author: Anna Povzner <an...@confluent.io>
AuthorDate: Tue Apr 10 01:45:08 2018 -0700
Kafka-6693: Added consumer workload to Trogdor (#4775)
Added consumer only workload to Trogdor. The topics must already be pre-populated. The spec lets the user request topic pattern and range of partitions to assign to [startPartition, endPartition].
Reviewers: Colin P. Mccabe <cm...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../apache/kafka/trogdor/common/WorkerUtils.java | 83 +++++-
.../kafka/trogdor/workload/ConsumeBenchSpec.java | 139 ++++++++++
.../kafka/trogdor/workload/ConsumeBenchWorker.java | 298 +++++++++++++++++++++
.../kafka/trogdor/common/WorkerUtilsTest.java | 58 +++-
4 files changed, 572 insertions(+), 6 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 98dbf38..3d4871a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -21,9 +21,14 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
@@ -39,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
+import java.util.regex.Pattern;
/**
* Utilities for Trogdor TaskWorkers.
@@ -91,7 +97,7 @@ public final class WorkerUtils {
}
}
- private static final int CREATE_TOPICS_REQUEST_TIMEOUT = 25000;
+ private static final int ADMIN_REQUEST_TIMEOUT = 25000;
private static final int CREATE_TOPICS_CALL_TIMEOUT = 180000;
private static final int MAX_CREATE_TOPICS_BATCH_SIZE = 10;
@@ -129,6 +135,32 @@ public final class WorkerUtils {
}
/**
+ * Returns a list of all existing topic partitions that match the following criteria: topic
+ * name matches give regular expression 'topicRegex', topic is not internal, partitions are
+ * in range [startPartition, endPartition]
+ *
+ * @param log The logger to use.
+ * @param bootstrapServers The bootstrap server list.
+ * @param topicRegex Topic name regular expression
+ * @param startPartition Starting partition of partition range
+ * @param endPartition Ending partition of partition range
+ * @return List of topic partitions
+ * @throws Throwable If getting list of topics or their descriptions fails.
+ */
+ public static Collection<TopicPartition> getMatchingTopicPartitions(
+ Logger log, String bootstrapServers,
+ Map<String, String> commonClientConf, Map<String, String> adminClientConf,
+ String topicRegex, int startPartition, int endPartition) throws Throwable {
+ try (AdminClient adminClient
+ = createAdminClient(bootstrapServers, commonClientConf, adminClientConf)) {
+ return getMatchingTopicPartitions(adminClient, topicRegex, startPartition, endPartition);
+ } catch (Exception e) {
+ log.warn("Failed to get topic partitions matching {}", topicRegex, e);
+ throw e;
+ }
+ }
+
+ /**
* The actual create topics functionality is separated into this method and called from the
* above method to be able to unit test with mock adminClient.
*/
@@ -233,7 +265,7 @@ public final class WorkerUtils {
Logger log, AdminClient adminClient,
Collection<String> topicsToVerify, Map<String, NewTopic> topicsInfo) throws Throwable {
DescribeTopicsResult topicsResult = adminClient.describeTopics(
- topicsToVerify, new DescribeTopicsOptions().timeoutMs(CREATE_TOPICS_REQUEST_TIMEOUT));
+ topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
// map will always contain the topic since all topics in 'topicsExists' are in given
@@ -249,12 +281,53 @@ public final class WorkerUtils {
}
}
+ /**
+ * Returns list of existing, not internal, topics/partitions that match given pattern and
+ * where partitions are in range [startPartition, endPartition]
+ * @param adminClient AdminClient
+ * @param topicRegex Topic regular expression to match
+ * @return List of topic names
+ * @throws Throwable If failed to get list of existing topics
+ */
+ static Collection<TopicPartition> getMatchingTopicPartitions(
+ AdminClient adminClient, String topicRegex, int startPartition, int endPartition)
+ throws Throwable {
+ final Pattern topicNamePattern = Pattern.compile(topicRegex);
+
+ // first get list of matching topics
+ List<String> matchedTopics = new ArrayList<>();
+ ListTopicsResult res = adminClient.listTopics(
+ new ListTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+ Map<String, TopicListing> topicListingMap = res.namesToListings().get();
+ for (Map.Entry<String, TopicListing> topicListingEntry: topicListingMap.entrySet()) {
+ if (!topicListingEntry.getValue().isInternal()
+ && topicNamePattern.matcher(topicListingEntry.getKey()).matches()) {
+ matchedTopics.add(topicListingEntry.getKey());
+ }
+ }
+
+ // create a list of topic/partitions
+ List<TopicPartition> out = new ArrayList<>();
+ DescribeTopicsResult topicsResult = adminClient.describeTopics(
+ matchedTopics, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
+ Map<String, TopicDescription> topicDescriptionMap = topicsResult.all().get();
+ for (TopicDescription desc: topicDescriptionMap.values()) {
+ List<TopicPartitionInfo> partitions = desc.partitions();
+ for (TopicPartitionInfo info: partitions) {
+ if ((info.partition() >= startPartition) && (info.partition() <= endPartition)) {
+ out.add(new TopicPartition(desc.name(), info.partition()));
+ }
+ }
+ }
+ return out;
+ }
+
private static AdminClient createAdminClient(
- String bootstrapServers, Map<String, String> commonClientConf,
- Map<String, String> adminClientConf) {
+ String bootstrapServers,
+ Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, CREATE_TOPICS_REQUEST_TIMEOUT);
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ADMIN_REQUEST_TIMEOUT);
// first add common client config, and then admin client config to properties, possibly
// over-writing default or common properties.
addConfigsToProperties(props, commonClientConf, adminClientConf);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
new file mode 100644
index 0000000..cef913b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The specification for a benchmark that produces messages to a set of topics.
+ */
+public class ConsumeBenchSpec extends TaskSpec {
+
+ private final String consumerNode;
+ private final String bootstrapServers;
+ private final int targetMessagesPerSec;
+ private final int maxMessages;
+ private final Map<String, String> consumerConf;
+ private final Map<String, String> adminClientConf;
+ private final Map<String, String> commonClientConf;
+ private final String topicRegex;
+ private final int startPartition;
+ private final int endPartition;
+
+
+ @JsonCreator
+ public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("consumerNode") String consumerNode,
+ @JsonProperty("bootstrapServers") String bootstrapServers,
+ @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+ @JsonProperty("maxMessages") int maxMessages,
+ @JsonProperty("consumerConf") Map<String, String> consumerConf,
+ @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
+ @JsonProperty("adminClientConf") Map<String, String> adminClientConf,
+ @JsonProperty("topicRegex") String topicRegex,
+ @JsonProperty("startPartition") int startPartition,
+ @JsonProperty("endPartition") int endPartition) {
+ super(startMs, durationMs);
+ this.consumerNode = (consumerNode == null) ? "" : consumerNode;
+ this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
+ this.targetMessagesPerSec = targetMessagesPerSec;
+ this.maxMessages = maxMessages;
+ this.consumerConf = configOrEmptyMap(consumerConf);
+ this.commonClientConf = configOrEmptyMap(commonClientConf);
+ this.adminClientConf = configOrEmptyMap(adminClientConf);
+ this.topicRegex = topicRegex;
+ this.startPartition = startPartition;
+ this.endPartition = endPartition;
+ }
+
+ @JsonProperty
+ public String consumerNode() {
+ return consumerNode;
+ }
+
+ @JsonProperty
+ public String bootstrapServers() {
+ return bootstrapServers;
+ }
+
+ @JsonProperty
+ public int targetMessagesPerSec() {
+ return targetMessagesPerSec;
+ }
+
+ @JsonProperty
+ public int maxMessages() {
+ return maxMessages;
+ }
+
+ @JsonProperty
+ public Map<String, String> consumerConf() {
+ return consumerConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> commonClientConf() {
+ return commonClientConf;
+ }
+
+ @JsonProperty
+ public Map<String, String> adminClientConf() {
+ return adminClientConf;
+ }
+
+ @JsonProperty
+ public String topicRegex() {
+ return topicRegex;
+ }
+
+ @JsonProperty
+ public int startPartition() {
+ return startPartition;
+ }
+
+ @JsonProperty
+ public int endPartition() {
+ return endPartition;
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return new TaskController() {
+ @Override
+ public Set<String> targetNodes(Topology topology) {
+ return Collections.singleton(consumerNode);
+ }
+ };
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ConsumeBenchWorker(id, this);
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
new file mode 100644
index 0000000..5c74d90
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ConsumeBenchWorker implements TaskWorker {
+ private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class);
+
+ private static final int THROTTLE_PERIOD_MS = 100;
+
+ private final String id;
+ private final ConsumeBenchSpec spec;
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private ScheduledExecutorService executor;
+ private WorkerStatusTracker status;
+ private KafkaFutureImpl<String> doneFuture;
+ private KafkaConsumer<byte[], byte[]> consumer;
+
+ public ConsumeBenchWorker(String id, ConsumeBenchSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void start(Platform platform, WorkerStatusTracker status,
+ KafkaFutureImpl<String> doneFuture) throws Exception {
+ if (!running.compareAndSet(false, true)) {
+ throw new IllegalStateException("ConsumeBenchWorker is already running.");
+ }
+ log.info("{}: Activating ConsumeBenchWorker with {}", id, spec);
+ this.executor = Executors.newScheduledThreadPool(
+ 2, ThreadUtils.createThreadFactory("ConsumeBenchWorkerThread%d", false));
+ this.status = status;
+ this.doneFuture = doneFuture;
+ executor.submit(new Prepare());
+ }
+
+ public class Prepare implements Runnable {
+ @Override
+ public void run() {
+ try {
+ // find topics to consume from based on provided topic regular expression
+ if (spec.topicRegex() == null) {
+ throw new ConfigException(
+ "Must provide topic name or regular expression to match existing topics.");
+ }
+ Collection<TopicPartition> topicPartitions =
+ WorkerUtils.getMatchingTopicPartitions(
+ log, spec.bootstrapServers(),
+ spec.commonClientConf(), spec.adminClientConf(),
+ spec.topicRegex(), spec.startPartition(), spec.endPartition());
+ log.info("Will consume from {}", topicPartitions);
+
+ executor.submit(new ConsumeMessages(topicPartitions));
+ } catch (Throwable e) {
+ WorkerUtils.abort(log, "Prepare", e, doneFuture);
+ }
+ }
+ }
+
+ public class ConsumeMessages implements Callable<Void> {
+ private final Histogram latencyHistogram;
+ private final Histogram messageSizeHistogram;
+ private final Future<?> statusUpdaterFuture;
+ private final Throttle throttle;
+
+ ConsumeMessages(Collection<TopicPartition> topicPartitions) {
+ this.latencyHistogram = new Histogram(5000);
+ this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
+ this.statusUpdaterFuture = executor.scheduleAtFixedRate(
+ new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES);
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+ // these defaults maybe over-written by the user-specified commonClientConf or
+ // consumerConf
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
+ consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
+ new ByteArrayDeserializer());
+ consumer.assign(topicPartitions);
+ int perPeriod = WorkerUtils.perSecToPerPeriod(
+ spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+ this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ long messagesConsumed = 0;
+ long bytesConsumed = 0;
+ long startTimeMs = Time.SYSTEM.milliseconds();
+ long startBatchMs = startTimeMs;
+ try {
+ while (messagesConsumed < spec.maxMessages()) {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+ if (records.isEmpty()) {
+ continue;
+ }
+ long endBatchMs = Time.SYSTEM.milliseconds();
+ long elapsedBatchMs = endBatchMs - startBatchMs;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ messagesConsumed++;
+ long messageBytes = 0;
+ if (record.key() != null) {
+ messageBytes += record.serializedKeySize();
+ }
+ if (record.value() != null) {
+ messageBytes += record.serializedValueSize();
+ }
+ latencyHistogram.add(elapsedBatchMs);
+ messageSizeHistogram.add(messageBytes);
+ bytesConsumed += messageBytes;
+ throttle.increment();
+ }
+ startBatchMs = Time.SYSTEM.milliseconds();
+ }
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "ConsumeRecords", e, doneFuture);
+ } finally {
+ statusUpdaterFuture.cancel(false);
+ StatusData statusData =
+ new StatusUpdater(latencyHistogram, messageSizeHistogram).update();
+ long curTimeMs = Time.SYSTEM.milliseconds();
+ log.info("Consumed total number of messages={}, bytes={} in {} ms. status: {}",
+ messagesConsumed, bytesConsumed, curTimeMs - startTimeMs, statusData);
+ }
+ doneFuture.complete("");
+ return null;
+ }
+ }
+
+ public class StatusUpdater implements Runnable {
+ private final Histogram latencyHistogram;
+ private final Histogram messageSizeHistogram;
+
+ StatusUpdater(Histogram latencyHistogram, Histogram messageSizeHistogram) {
+ this.latencyHistogram = latencyHistogram;
+ this.messageSizeHistogram = messageSizeHistogram;
+ }
+
+ @Override
+ public void run() {
+ try {
+ update();
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+ }
+ }
+
+ StatusData update() {
+ Histogram.Summary latSummary = latencyHistogram.summarize(StatusData.PERCENTILES);
+ Histogram.Summary msgSummary = messageSizeHistogram.summarize(StatusData.PERCENTILES);
+ StatusData statusData = new StatusData(
+ latSummary.numSamples(),
+ (long) (msgSummary.numSamples() * msgSummary.average()),
+ (long) msgSummary.average(),
+ latSummary.average(),
+ latSummary.percentiles().get(0).value(),
+ latSummary.percentiles().get(1).value(),
+ latSummary.percentiles().get(2).value());
+ status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+ log.info("Status={}", JsonUtil.toJsonString(statusData));
+ return statusData;
+ }
+ }
+
+ public static class StatusData {
+ private final long totalMessagesReceived;
+ private final long totalBytesReceived;
+ private final long averageMessageSizeBytes;
+ private final float averageLatencyMs;
+ private final int p50LatencyMs;
+ private final int p95LatencyMs;
+ private final int p99LatencyMs;
+
+ /**
+ * The percentiles to use when calculating the histogram data.
+ * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
+ */
+ final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
+ @JsonCreator
+ StatusData(@JsonProperty("totalMessagesReceived") long totalMessagesReceived,
+ @JsonProperty("totalBytesReceived") long totalBytesReceived,
+ @JsonProperty("averageMessageSizeBytes") long averageMessageSizeBytes,
+ @JsonProperty("averageLatencyMs") float averageLatencyMs,
+ @JsonProperty("p50LatencyMs") int p50latencyMs,
+ @JsonProperty("p95LatencyMs") int p95latencyMs,
+ @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ this.totalMessagesReceived = totalMessagesReceived;
+ this.totalBytesReceived = totalBytesReceived;
+ this.averageMessageSizeBytes = averageMessageSizeBytes;
+ this.averageLatencyMs = averageLatencyMs;
+ this.p50LatencyMs = p50latencyMs;
+ this.p95LatencyMs = p95latencyMs;
+ this.p99LatencyMs = p99latencyMs;
+ }
+
+ @JsonProperty
+ public long totalMessagesReceived() {
+ return totalMessagesReceived;
+ }
+
+ @JsonProperty
+ public long totalBytesReceived() {
+ return totalBytesReceived;
+ }
+
+ @JsonProperty
+ public long averageMessageSizeBytes() {
+ return averageMessageSizeBytes;
+ }
+
+ @JsonProperty
+ public float averageLatencyMs() {
+ return averageLatencyMs;
+ }
+
+ @JsonProperty
+ public int p50LatencyMs() {
+ return p50LatencyMs;
+ }
+
+ @JsonProperty
+ public int p95LatencyMs() {
+ return p95LatencyMs;
+ }
+
+ @JsonProperty
+ public int p99LatencyMs() {
+ return p99LatencyMs;
+ }
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ if (!running.compareAndSet(true, false)) {
+ throw new IllegalStateException("ConsumeBenchWorker is not running.");
+ }
+ log.info("{}: Deactivating ConsumeBenchWorker.", id);
+ doneFuture.complete("");
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ Utils.closeQuietly(consumer, "consumer");
+ this.consumer = null;
+ this.executor = null;
+ this.status = null;
+ this.doneFuture = null;
+ }
+
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index fbe2389..a35efe1 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -18,9 +18,9 @@
package org.apache.kafka.trogdor.common;
-
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Node;
@@ -38,8 +38,10 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -262,4 +264,58 @@ public class WorkerUtilsTest {
Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "0"));
assertEquals(resultProps, props);
}
+
+ @Test
+ public void testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName() throws Throwable {
+ final String topic1 = "existing-topic";
+ final String topic2 = "another-topic";
+ makeExistingTopicWithOneReplica(topic1, 10);
+ makeExistingTopicWithOneReplica(topic2, 20);
+
+ Collection<TopicPartition> topicPartitions =
+ WorkerUtils.getMatchingTopicPartitions(adminClient, topic2, 0, 2);
+ assertEquals(
+ Utils.mkSet(
+ new TopicPartition(topic2, 0), new TopicPartition(topic2, 1),
+ new TopicPartition(topic2, 2)
+ ),
+ new HashSet<>(topicPartitions)
+ );
+ }
+
+ @Test
+ public void testGetMatchingTopicPartitionsCorrectlyMatchesTopics() throws Throwable {
+ final String topic1 = "test-topic";
+ final String topic2 = "another-test-topic";
+ final String topic3 = "one-more";
+ makeExistingTopicWithOneReplica(topic1, 10);
+ makeExistingTopicWithOneReplica(topic2, 20);
+ makeExistingTopicWithOneReplica(topic3, 30);
+
+ Collection<TopicPartition> topicPartitions =
+ WorkerUtils.getMatchingTopicPartitions(adminClient, ".*-topic$", 0, 1);
+ assertEquals(
+ Utils.mkSet(
+ new TopicPartition(topic1, 0), new TopicPartition(topic1, 1),
+ new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)
+ ),
+ new HashSet<>(topicPartitions)
+ );
+ }
+
+ private void makeExistingTopicWithOneReplica(String topicName, int numPartitions) {
+ List<TopicPartitionInfo> tpInfo = new ArrayList<>();
+ int brokerIndex = 0;
+ for (int i = 0; i < numPartitions; ++i) {
+ Node broker = cluster.get(brokerIndex);
+ tpInfo.add(new TopicPartitionInfo(
+ i, broker, singleReplica, Collections.<Node>emptyList()));
+ brokerIndex = (brokerIndex + 1) % cluster.size();
+ }
+ adminClient.addTopic(
+ false,
+ topicName,
+ tpInfo,
+ null);
+ }
}
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.