You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/28 22:10:04 UTC
kafka git commit: KAFKA-6255; Add ProduceBench to Trogdor
Repository: kafka
Updated Branches:
refs/heads/trunk 1a1d923f2 -> 58877a0de
KAFKA-6255; Add ProduceBench to Trogdor
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Rajini Sivaram <ra...@googlemail.com>
Closes #4245 from cmccabe/KAFKA-6255
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58877a0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58877a0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58877a0d
Branch: refs/heads/trunk
Commit: 58877a0deacd6c13436af83ef5be1a6f75a3ac4a
Parents: 1a1d923
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue Nov 28 22:09:55 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Tue Nov 28 22:09:55 2017 +0000
----------------------------------------------------------------------
.../services/trogdor/produce_bench_workload.py | 65 ++++
.../kafkatest/tests/core/produce_bench_test.py | 57 +++
.../kafka/trogdor/coordinator/Coordinator.java | 2 +-
.../kafka/trogdor/coordinator/NodeManager.java | 9 +-
.../kafka/trogdor/workload/Histogram.java | 207 +++++++++++
.../workload/ProduceBenchController.java | 37 ++
.../trogdor/workload/ProduceBenchSpec.java | 104 ++++++
.../trogdor/workload/ProduceBenchWorker.java | 358 +++++++++++++++++++
.../apache/kafka/trogdor/workload/Throttle.java | 64 ++++
.../kafka/trogdor/workload/HistogramTest.java | 85 +++++
.../kafka/trogdor/workload/ThrottleTest.java | 70 ++++
11 files changed, 1054 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tests/kafkatest/services/trogdor/produce_bench_workload.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
new file mode 100644
index 0000000..9d1f005
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ProduceBenchWorkloadSpec(TaskSpec):
+ def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
+ target_messages_per_sec, max_messages, producer_conf,
+ total_topics, active_topics):
+ super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
+ self.producer_node = producer_node
+ self.bootstrap_servers = bootstrap_servers
+ self.target_messages_per_sec = target_messages_per_sec
+ self.max_messages = max_messages
+ self.producer_conf = producer_conf
+ self.total_topics = total_topics
+ self.active_topics = active_topics
+
+ def message(self):
+ return {
+ "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ "startMs": self.start_ms,
+ "durationMs": self.duration_ms,
+ "producerNode": self.producer_node,
+ "bootstrapServers": self.bootstrap_servers,
+ "targetMessagesPerSec": self.target_messages_per_sec,
+ "maxMessages": self.max_messages,
+ "producerConf": self.producer_conf,
+ "totalTopics": self.total_topics,
+ "activeTopics": self.active_topics,
+ }
+
+
+class ProduceBenchWorkloadService(Service):
+ def __init__(self, context, kafka):
+ Service.__init__(self, context, num_nodes=1)
+ self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+ self.producer_node = self.nodes[0].account.hostname
+
+ def free(self):
+ Service.free(self)
+
+ def wait_node(self, node, timeout_sec=None):
+ pass
+
+ def stop_node(self, node):
+ pass
+
+ def clean_node(self, node):
+ pass
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tests/kafkatest/tests/core/produce_bench_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
new file mode 100644
index 0000000..99df666
--- /dev/null
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class ProduceBenchTest(Test):
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(ProduceBenchTest, self).__init__(test_context)
+ self.zk = ZookeeperService(test_context, num_nodes=3)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+ self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
+ self.trogdor = TrogdorService(context=self.test_context,
+ client_services=[self.kafka, self.workload_service])
+
+ def setUp(self):
+ self.trogdor.start()
+ self.zk.start()
+ self.kafka.start()
+
+ def teardown(self):
+ self.trogdor.stop()
+ self.kafka.stop()
+ self.zk.stop()
+
+ def test_produce_bench(self):
+ spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+ self.workload_service.producer_node,
+ self.workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=100000,
+ producer_conf={},
+ total_topics=10,
+ active_topics=2)
+ workload1 = self.trogdor.create_task("workload1", spec)
+ workload1.wait_for_done(timeout_sec=360)
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 8c26d8d..b2dc474 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -64,7 +64,7 @@ public final class Coordinator {
* Create a new Coordinator.
*
* @param platform The platform object to use.
- * @param time The timekeeper to use for this Coordinator.
+ * @param scheduler The scheduler to use for this Coordinator.
* @param restServer The REST server to use.
* @param resource The AgentRestResoure to use.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index aea9617..fd871bc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -221,11 +221,14 @@ public final class NodeManager {
}
} else if (state instanceof WorkerDone) {
if (!(worker.state instanceof WorkerDone)) {
- String error = ((WorkerDone) state).error();
+ WorkerDone workerDoneState = (WorkerDone) state;
+ String error = workerDoneState.error();
if (error.isEmpty()) {
- log.warn("{}: Worker {} finished with no error.", node.name(), id);
+ log.info("{}: Worker {} finished with status '{}'",
+ node.name(), id, workerDoneState.status());
} else {
- log.warn("{}: Worker {} finished with error '{}'", node.name(), id, error);
+ log.warn("{}: Worker {} finished with error '{}' and status '{}'",
+ node.name(), id, error, workerDoneState.status());
}
taskManager.handleWorkerCompletion(node.name(), worker.id, error);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
new file mode 100644
index 0000000..cee2b4a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A histogram that can easily find the average, median etc of a large number of samples in a
+ * restricted domain.
+ */
+public class Histogram {
+ private final int[] counts;
+
+ private final Logger log = LoggerFactory.getLogger(Histogram.class);
+
+ public Histogram(int maxValue) {
+ this.counts = new int[maxValue + 1];
+ }
+
+ /**
+ * Add a new value to the histogram.
+ *
+ * Note that the value will be clipped to the maximum value available in the Histogram instance.
+ * So if the histogram has 100 buckets, inserting 101 will increment the last bucket.
+ */
+ public void add(int value) {
+ if (value < 0) {
+ throw new RuntimeException("invalid negative value.");
+ }
+ if (value >= counts.length) {
+ value = counts.length - 1;
+ }
+ synchronized (this) {
+ int curCount = counts[value];
+ if (curCount < Integer.MAX_VALUE) {
+ counts[value] = counts[value] + 1;
+ }
+ }
+ }
+
+ /**
+ * Add a new value to the histogram.
+ *
+ * Note that the value will be clipped to the maximum value available in the Histogram instance.
+ * This method is provided for convenience, but handles the same numeric range as the method which
+ * takes an int.
+ */
+ public void add(long value) {
+ if (value > Integer.MAX_VALUE) {
+ add(Integer.MAX_VALUE);
+ } else if (value < Integer.MIN_VALUE) {
+ add(Integer.MIN_VALUE);
+ } else {
+ add((int) value);
+ }
+ }
+
+ public static class Summary {
+ /**
+ * The total number of samples.
+ */
+ private final long numSamples;
+
+ /**
+ * The average of all samples.
+ */
+ private final float average;
+
+ /**
+ * Percentile information.
+ *
+ * percentile(fraction=0.99) will have a value which is greater than or equal to 99%
+ * of the samples. percentile(fraction=0.5) is the median sample. And so forth.
+ */
+ private final List<PercentileSummary> percentiles;
+
+ Summary(long numSamples, float average, List<PercentileSummary> percentiles) {
+ this.numSamples = numSamples;
+ this.average = average;
+ this.percentiles = percentiles;
+ }
+
+ public long numSamples() {
+ return numSamples;
+ }
+
+ public float average() {
+ return average;
+ }
+
+ public List<PercentileSummary> percentiles() {
+ return percentiles;
+ }
+ }
+
+ /**
+ * Information about a percentile.
+ */
+ public static class PercentileSummary {
+ /**
+ * The fraction of samples which are less than or equal to the value of this percentile.
+ */
+ private final float fraction;
+
+ /**
+ * The value of this percentile.
+ */
+ private final int value;
+
+ PercentileSummary(float fraction, int value) {
+ this.fraction = fraction;
+ this.value = value;
+ }
+
+ public float fraction() {
+ return fraction;
+ }
+
+ public int value() {
+ return value;
+ }
+ }
+
+ public Summary summarize() {
+ return summarize(new float[0]);
+ }
+
+ public Summary summarize(float[] percentiles) {
+ int[] countsCopy = new int[counts.length];
+ synchronized (this) {
+ System.arraycopy(counts, 0, countsCopy, 0, counts.length);
+ }
+ // Verify that the percentiles array is sorted and positive.
+ float prev = 0f;
+ for (int i = 0; i < percentiles.length; i++) {
+ if (percentiles[i] < prev) {
+ throw new RuntimeException("Invalid percentiles fraction array. Bad element " +
+ percentiles[i] + ". The array must be sorted and non-negative.");
+ }
+ if (percentiles[i] > 1.0f) {
+ throw new RuntimeException("Invalid percentiles fraction array. Bad element " +
+ percentiles[i] + ". Elements must be less than or equal to 1.");
+ }
+ }
+ // Find out how many total samples we have, and what the average is.
+ long numSamples = 0;
+ float total = 0f;
+ for (int i = 0; i < countsCopy.length; i++) {
+ long count = countsCopy[i];
+ numSamples = numSamples + count;
+ total = total + (i * count);
+ }
+ float average = (numSamples == 0) ? 0.0f : (total / numSamples);
+
+ List<PercentileSummary> percentileSummaries =
+ summarizePercentiles(countsCopy, percentiles, numSamples);
+ return new Summary(numSamples, average, percentileSummaries);
+ }
+
+ private List<PercentileSummary> summarizePercentiles(int[] countsCopy, float[] percentiles,
+ long numSamples) {
+ if (percentiles.length == 0) {
+ return Collections.emptyList();
+ }
+ List<PercentileSummary> summaries = new ArrayList<>(percentiles.length);
+ int i = 0, j = 0;
+ long seen = 0, next = (long) (numSamples * percentiles[0]);
+ while (true) {
+ if (i == countsCopy.length - 1) {
+ for (; j < percentiles.length; j++) {
+ summaries.add(new PercentileSummary(percentiles[j], i));
+ }
+ return summaries;
+ }
+ seen += countsCopy[i];
+ while (seen >= next) {
+ summaries.add(new PercentileSummary(percentiles[j], i));
+ j++;
+ if (j == percentiles.length) {
+ return summaries;
+ }
+ next = (long) (numSamples * percentiles[j]);
+ }
+ i++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
new file mode 100644
index 0000000..c56a22a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProduceBenchController implements TaskController {
+ private final ProduceBenchSpec spec;
+
+ public ProduceBenchController(ProduceBenchSpec spec) {
+ this.spec = spec;
+ }
+
+ @Override
+ public Set<String> targetNodes(Topology topology) {
+ return Collections.singleton(spec.producerNode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
new file mode 100644
index 0000000..3fb185e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Map;
+
+/**
+ * The specification for a benchmark that produces messages to a set of topics.
+ */
+public class ProduceBenchSpec extends TaskSpec {
+ private final String producerNode;
+ private final String bootstrapServers;
+ private final int targetMessagesPerSec;
+ private final int maxMessages;
+ private final Map<String, String> producerConf;
+ private final int totalTopics;
+ private final int activeTopics;
+
+ @JsonCreator
+ public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("producerNode") String producerNode,
+ @JsonProperty("bootstrapServers") String bootstrapServers,
+ @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+ @JsonProperty("maxMessages") int maxMessages,
+ @JsonProperty("producerConf") Map<String, String> producerConf,
+ @JsonProperty("totalTopics") int totalTopics,
+ @JsonProperty("activeTopics") int activeTopics) {
+ super(startMs, durationMs);
+ this.producerNode = producerNode;
+ this.bootstrapServers = bootstrapServers;
+ this.targetMessagesPerSec = targetMessagesPerSec;
+ this.maxMessages = maxMessages;
+ this.producerConf = producerConf;
+ this.totalTopics = totalTopics;
+ this.activeTopics = activeTopics;
+ }
+
+ @JsonProperty
+ public String producerNode() {
+ return producerNode;
+ }
+
+ @JsonProperty
+ public String bootstrapServers() {
+ return bootstrapServers;
+ }
+
+ @JsonProperty
+ public int targetMessagesPerSec() {
+ return targetMessagesPerSec;
+ }
+
+ @JsonProperty
+ public int maxMessages() {
+ return maxMessages;
+ }
+
+ @JsonProperty
+ public Map<String, String> producerConf() {
+ return producerConf;
+ }
+
+ @JsonProperty
+ public int totalTopics() {
+ return totalTopics;
+ }
+
+ @JsonProperty
+ public int activeTopics() {
+ return activeTopics;
+ }
+
+ @Override
+ public TaskController newController(String id) {
+ return new ProduceBenchController(this);
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ProduceBenchWorker(id, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
new file mode 100644
index 0000000..0d50e3e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ProduceBenchWorker implements TaskWorker {
+ private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
+
+ private static final short REPLICATION_FACTOR = 3;
+
+ private static final int MESSAGE_SIZE = 512;
+
+ private static final int THROTTLE_PERIOD_MS = 100;
+
+ private final String id;
+
+ private final ProduceBenchSpec spec;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private ScheduledExecutorService executor;
+
+ private AtomicReference<String> status;
+
+ private KafkaFutureImpl<String> doneFuture;
+
+ public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void start(Platform platform, AtomicReference<String> status,
+ KafkaFutureImpl<String> doneFuture) throws Exception {
+ if (!running.compareAndSet(false, true)) {
+ throw new IllegalStateException("ProducerBenchWorker is already running.");
+ }
+ log.info("{}: Activating ProduceBenchWorker.", id);
+ this.executor = Executors.newScheduledThreadPool(1,
+ ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
+ this.status = status;
+ this.doneFuture = doneFuture;
+ executor.submit(new ValidateSpec());
+ }
+
+ private static String topicIndexToName(int topicIndex) {
+ return String.format("topic%05d", topicIndex);
+ }
+
+ private void abort(String what, Exception e) throws KafkaException {
+ log.warn(what + " caught an exception: ", e);
+ doneFuture.completeExceptionally(new KafkaException(what + " caught an exception.", e));
+ throw new KafkaException(e);
+ }
+
+ public class ValidateSpec implements Callable<Void> {
+ @Override
+ public Void call() throws Exception {
+ try {
+ if (spec.activeTopics() == 0) {
+ throw new ConfigException("Can't have activeTopics == 0.");
+ }
+ if (spec.totalTopics() < spec.activeTopics()) {
+ throw new ConfigException(String.format(
+ "activeTopics was %d, but totalTopics was only %d. activeTopics must " +
+ "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
+ }
+ executor.submit(new CreateBenchmarkTopics());
+ } catch (Exception e) {
+ abort("ValidateSpec", e);
+ }
+ return null;
+ }
+ }
+
+ public class CreateBenchmarkTopics implements Callable<Void> {
+ private final static int MAX_BATCH_SIZE = 10;
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ List<String> topicsToCreate = new ArrayList<>();
+ for (int i = 0; i < spec.totalTopics(); i++) {
+ topicsToCreate.add(topicIndexToName(i));
+ }
+ log.info("Creating " + spec.totalTopics() + " topics...");
+ List<Future<Void>> futures = new ArrayList<>();
+ while (!topicsToCreate.isEmpty()) {
+ List<NewTopic> newTopics = new ArrayList<>();
+ for (int i = 0; (i < MAX_BATCH_SIZE) && !topicsToCreate.isEmpty(); i++) {
+ String topic = topicsToCreate.remove(0);
+ newTopics.add(new NewTopic(topic, 1, REPLICATION_FACTOR));
+ }
+ futures.add(adminClient.createTopics(newTopics).all());
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ log.info("Successfully created " + spec.totalTopics() + " topics.");
+ }
+ executor.submit(new SendRecords());
+ } catch (Exception e) {
+ abort("CreateBenchmarkTopics", e);
+ }
+ return null;
+ }
+ }
+
+ private static class SendRecordsCallback implements Callback {
+ private final SendRecords sendRecords;
+ private final long startMs;
+
+ SendRecordsCallback(SendRecords sendRecords, long startMs) {
+ this.sendRecords = sendRecords;
+ this.startMs = startMs;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ long now = Time.SYSTEM.milliseconds();
+ long durationMs = now - startMs;
+ sendRecords.recordDuration(durationMs);
+ if (exception != null) {
+ log.error("SendRecordsCallback: error", exception);
+ }
+ }
+ }
+
+ private int perSecToPerPeriod(float perSec, long periodMs) {
+ float period = ((float) periodMs) / 1000.0f;
+ float perPeriod = perSec * period;
+ perPeriod = Math.max(1.0f, perPeriod);
+ return (int) perPeriod;
+ }
+
+ /**
+ * A subclass of Throttle which flushes the Producer right before the throttle injects a delay.
+ * This avoids including throttling latency in latency measurements.
+ */
+ private static class SendRecordsThrottle extends Throttle {
+ private final KafkaProducer<?, ?> producer;
+
+ SendRecordsThrottle(int maxPerPeriod, KafkaProducer<?, ?> producer) {
+ super(maxPerPeriod, THROTTLE_PERIOD_MS);
+ this.producer = producer;
+ }
+
+ @Override
+ protected synchronized void delay(long amount) throws InterruptedException {
+ long startMs = time().milliseconds();
+ producer.flush();
+ long endMs = time().milliseconds();
+ long delta = endMs - startMs;
+ super.delay(amount - delta);
+ }
+ }
+
+ public class SendRecords implements Callable<Void> {
+ private final Histogram histogram;
+
+ private final Future<?> statusUpdaterFuture;
+
+ private final KafkaProducer<byte[], byte[]> producer;
+
+ private final Throttle throttle;
+
+ SendRecords() {
+ this.histogram = new Histogram(5000);
+ int perPeriod = perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+ this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
+ new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+ for (Map.Entry<String, String> entry : spec.producerConf().entrySet()) {
+ props.setProperty(entry.getKey(), entry.getValue());
+ }
+ this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
+ this.throttle = new SendRecordsThrottle(perPeriod, producer);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ long startTimeMs = Time.SYSTEM.milliseconds();
+ try {
+ byte[] key = new byte[MESSAGE_SIZE];
+ byte[] value = new byte[MESSAGE_SIZE];
+ Future<RecordMetadata> future = null;
+ try {
+ for (int m = 0; m < spec.maxMessages(); m++) {
+ for (int i = 0; i < spec.activeTopics(); i++) {
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicIndexToName(i), key, value);
+ future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
+ }
+ throttle.increment();
+ }
+ } finally {
+ if (future != null) {
+ future.get();
+ }
+ producer.close();
+ }
+ } catch (Exception e) {
+ abort("SendRecords", e);
+ } finally {
+ statusUpdaterFuture.cancel(false);
+ new StatusUpdater(histogram).run();
+ long curTimeMs = Time.SYSTEM.milliseconds();
+ log.info("Sent {} total record(s) in {} ms. status: {}",
+ histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
+ }
+ doneFuture.complete("");
+ return null;
+ }
+
+ void recordDuration(long durationMs) {
+ histogram.add(durationMs);
+ }
+ }
+
+ public class StatusUpdater implements Runnable {
+ private final Histogram histogram;
+ private final float[] percentiles;
+
+ StatusUpdater(Histogram histogram) {
+ this.histogram = histogram;
+ this.percentiles = new float[3];
+ this.percentiles[0] = 0.50f;
+ this.percentiles[1] = 0.95f;
+ this.percentiles[2] = 0.99f;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Histogram.Summary summary = histogram.summarize(percentiles);
+ StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
+ summary.percentiles().get(0).value(),
+ summary.percentiles().get(1).value(),
+ summary.percentiles().get(2).value());
+ String statusDataString = JsonUtil.toJsonString(statusData);
+ status.set(statusDataString);
+ } catch (Exception e) {
+ abort("StatusUpdater", e);
+ }
+ }
+ }
+
+ public static class StatusData {
+ private final long totalSent;
+ private final float averageLatencyMs;
+ private final int p50LatencyMs;
+ private final int p90LatencyMs;
+ private final int p99LatencyMs;
+
+ @JsonCreator
+ StatusData(@JsonProperty("totalSent") long totalSent,
+ @JsonProperty("averageLatencyMs") float averageLatencyMs,
+ @JsonProperty("p50LatencyMs") int p50latencyMs,
+ @JsonProperty("p90LatencyMs") int p90latencyMs,
+ @JsonProperty("p99LatencyMs") int p99latencyMs) {
+ this.totalSent = totalSent;
+ this.averageLatencyMs = averageLatencyMs;
+ this.p50LatencyMs = p50latencyMs;
+ this.p90LatencyMs = p90latencyMs;
+ this.p99LatencyMs = p99latencyMs;
+ }
+
+ @JsonProperty
+ public long totalSent() {
+ return totalSent;
+ }
+
+ @JsonProperty
+ public float averageLatencyMs() {
+ return averageLatencyMs;
+ }
+
+ @JsonProperty
+ public int p50LatencyMs() {
+ return p50LatencyMs;
+ }
+
+ @JsonProperty
+ public int p90LatencyMs() {
+ return p90LatencyMs;
+ }
+
+ @JsonProperty
+ public int p99LatencyMs() {
+ return p99LatencyMs;
+ }
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ if (!running.compareAndSet(true, false)) {
+ throw new IllegalStateException("ProduceBenchWorker is not running.");
+ }
+ log.info("{}: Deactivating ProduceBenchWorker.", id);
+ doneFuture.complete("");
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ this.executor = null;
+ this.status = null;
+ this.doneFuture = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
new file mode 100644
index 0000000..41f9d02
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.utils.Time;
+
+public class Throttle {
+ private final int maxPerPeriod;
+ private final int periodMs;
+ private int count;
+ private long prevPeriod;
+
+ Throttle(int maxPerPeriod, int periodMs) {
+ this.maxPerPeriod = maxPerPeriod;
+ this.periodMs = periodMs;
+ this.count = maxPerPeriod;
+ this.prevPeriod = -1;
+ }
+
+ synchronized public boolean increment() throws InterruptedException {
+ boolean throttled = false;
+ while (true) {
+ if (count < maxPerPeriod) {
+ count++;
+ return throttled;
+ }
+ long now = time().milliseconds();
+ long curPeriod = now / periodMs;
+ if (curPeriod <= prevPeriod) {
+ long nextPeriodMs = (curPeriod + 1) * periodMs;
+ delay(nextPeriodMs - now);
+ throttled = true;
+ } else {
+ prevPeriod = curPeriod;
+ count = 0;
+ }
+ }
+ }
+
+ protected Time time() {
+ return Time.SYSTEM;
+ }
+
+ protected synchronized void delay(long amount) throws InterruptedException {
+ if (amount > 0) {
+ wait(amount);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
new file mode 100644
index 0000000..5193f77
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HistogramTest {
+ private static Histogram createHistogram(int maxValue, int... values) {
+ Histogram histogram = new Histogram(maxValue);
+ for (int value : values) {
+ histogram.add(value);
+ }
+ return histogram;
+ }
+
+ @Test
+ public void testHistogramAverage() {
+ Histogram empty = createHistogram(1);
+ Assert.assertEquals(0, (int) empty.summarize(new float[0]).average());
+
+ Histogram histogram = createHistogram(70, 1, 2, 3, 4, 5, 6, 1);
+
+ Assert.assertEquals(3, (int) histogram.summarize(new float[0]).average());
+ histogram.add(60);
+ Assert.assertEquals(10, (int) histogram.summarize(new float[0]).average());
+ }
+
+ @Test
+ public void testHistogramSamples() {
+ Histogram empty = createHistogram(100);
+ Assert.assertEquals(0, empty.summarize(new float[0]).numSamples());
+ Histogram histogram = createHistogram(100, 4, 8, 2, 4, 1, 100, 150);
+ Assert.assertEquals(7, histogram.summarize(new float[0]).numSamples());
+ histogram.add(60);
+ Assert.assertEquals(8, histogram.summarize(new float[0]).numSamples());
+ }
+
+ @Test
+ public void testHistogramPercentiles() {
+ Histogram histogram = createHistogram(100, 1, 2, 3, 4, 5, 6, 80, 90);
+ float[] percentiles = new float[] {0.5f, 0.90f, 0.99f, 1f};
+ Histogram.Summary summary = histogram.summarize(percentiles);
+ Assert.assertEquals(8, summary.numSamples());
+ Assert.assertEquals(4, summary.percentiles().get(0).value());
+ Assert.assertEquals(80, summary.percentiles().get(1).value());
+ Assert.assertEquals(80, summary.percentiles().get(2).value());
+ Assert.assertEquals(90, summary.percentiles().get(3).value());
+ histogram.add(30);
+ histogram.add(30);
+ histogram.add(30);
+
+ summary = histogram.summarize(new float[] {0.5f});
+ Assert.assertEquals(11, summary.numSamples());
+ Assert.assertEquals(5, summary.percentiles().get(0).value());
+
+ Histogram empty = createHistogram(100);
+ summary = empty.summarize(new float[] {0.5f});
+ Assert.assertEquals(0, summary.percentiles().get(0).value());
+
+ histogram = createHistogram(1000);
+ histogram.add(100);
+ histogram.add(200);
+ summary = histogram.summarize(new float[] {0f, 0.5f, 1.0f});
+ Assert.assertEquals(0, summary.percentiles().get(0).value());
+ Assert.assertEquals(100, summary.percentiles().get(1).value());
+ Assert.assertEquals(200, summary.percentiles().get(2).value());
+ }
+};
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
new file mode 100644
index 0000000..1644545
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ThrottleTest {
+ /**
+ * ThrottleMock is a subclass of Throttle that uses a MockTime object. It calls
+ * MockTime#sleep instead of Object#wait.
+ */
+ private static class ThrottleMock extends Throttle {
+ final MockTime time;
+
+ ThrottleMock(MockTime time, int maxPerSec) {
+ super(maxPerSec, 100);
+ this.time = time;
+ }
+
+ @Override
+ protected Time time() {
+ return time;
+ }
+
+ @Override
+ protected synchronized void delay(long amount) throws InterruptedException {
+ time.sleep(amount);
+ }
+ }
+
+ @Test
+ public void testThrottle() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ ThrottleMock throttle = new ThrottleMock(time, 3);
+ Assert.assertFalse(throttle.increment());
+ Assert.assertEquals(0, time.milliseconds());
+ Assert.assertFalse(throttle.increment());
+ Assert.assertEquals(0, time.milliseconds());
+ Assert.assertFalse(throttle.increment());
+ Assert.assertEquals(0, time.milliseconds());
+ Assert.assertTrue(throttle.increment());
+ Assert.assertEquals(100, time.milliseconds());
+ time.sleep(50);
+ Assert.assertFalse(throttle.increment());
+ Assert.assertEquals(150, time.milliseconds());
+ Assert.assertFalse(throttle.increment());
+ Assert.assertEquals(150, time.milliseconds());
+ Assert.assertTrue(throttle.increment());
+ Assert.assertEquals(200, time.milliseconds());
+ }
+};
+