You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/28 22:10:04 UTC

kafka git commit: KAFKA-6255; Add ProduceBench to Trogdor

Repository: kafka
Updated Branches:
  refs/heads/trunk 1a1d923f2 -> 58877a0de


KAFKA-6255; Add ProduceBench to Trogdor

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Rajini Sivaram <ra...@googlemail.com>

Closes #4245 from cmccabe/KAFKA-6255


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58877a0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58877a0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58877a0d

Branch: refs/heads/trunk
Commit: 58877a0deacd6c13436af83ef5be1a6f75a3ac4a
Parents: 1a1d923
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue Nov 28 22:09:55 2017 +0000
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Tue Nov 28 22:09:55 2017 +0000

----------------------------------------------------------------------
 .../services/trogdor/produce_bench_workload.py  |  65 ++++
 .../kafkatest/tests/core/produce_bench_test.py  |  57 +++
 .../kafka/trogdor/coordinator/Coordinator.java  |   2 +-
 .../kafka/trogdor/coordinator/NodeManager.java  |   9 +-
 .../kafka/trogdor/workload/Histogram.java       | 207 +++++++++++
 .../workload/ProduceBenchController.java        |  37 ++
 .../trogdor/workload/ProduceBenchSpec.java      | 104 ++++++
 .../trogdor/workload/ProduceBenchWorker.java    | 358 +++++++++++++++++++
 .../apache/kafka/trogdor/workload/Throttle.java |  64 ++++
 .../kafka/trogdor/workload/HistogramTest.java   |  85 +++++
 .../kafka/trogdor/workload/ThrottleTest.java    |  70 ++++
 11 files changed, 1054 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tests/kafkatest/services/trogdor/produce_bench_workload.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py
new file mode 100644
index 0000000..9d1f005
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ProduceBenchWorkloadSpec(TaskSpec):
+    def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
+                 target_messages_per_sec, max_messages, producer_conf,
+                 total_topics, active_topics):
+        super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
+        self.producer_node = producer_node
+        self.bootstrap_servers = bootstrap_servers
+        self.target_messages_per_sec = target_messages_per_sec
+        self.max_messages = max_messages
+        self.producer_conf = producer_conf
+        self.total_topics = total_topics
+        self.active_topics = active_topics
+
+    def message(self):
+        return {
+            "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+            "startMs": self.start_ms,
+            "durationMs": self.duration_ms,
+            "producerNode": self.producer_node,
+            "bootstrapServers": self.bootstrap_servers,
+            "targetMessagesPerSec": self.target_messages_per_sec,
+            "maxMessages": self.max_messages,
+            "producerConf": self.producer_conf,
+            "totalTopics": self.total_topics,
+            "activeTopics": self.active_topics,
+        }
+
+
+class ProduceBenchWorkloadService(Service):
+    def __init__(self, context, kafka):
+        Service.__init__(self, context, num_nodes=1)
+        self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+        self.producer_node = self.nodes[0].account.hostname
+
+    def free(self):
+        Service.free(self)
+
+    def wait_node(self, node, timeout_sec=None):
+        pass
+
+    def stop_node(self, node):
+        pass
+
+    def clean_node(self, node):
+        pass

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tests/kafkatest/tests/core/produce_bench_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py
new file mode 100644
index 0000000..99df666
--- /dev/null
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class ProduceBenchTest(Test):
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ProduceBenchTest, self).__init__(test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+        self.workload_service = ProduceBenchWorkloadService(test_context, self.kafka)
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.kafka, self.workload_service])
+
+    def setUp(self):
+        self.trogdor.start()
+        self.zk.start()
+        self.kafka.start()
+
+    def teardown(self):
+        self.trogdor.stop()
+        self.kafka.stop()
+        self.zk.stop()
+
+    def test_produce_bench(self):
+        spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                        self.workload_service.producer_node,
+                                        self.workload_service.bootstrap_servers,
+                                        target_messages_per_sec=1000,
+                                        max_messages=100000,
+                                        producer_conf={},
+                                        total_topics=10,
+                                        active_topics=2)
+        workload1 = self.trogdor.create_task("workload1", spec)
+        workload1.wait_for_done(timeout_sec=360)
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 8c26d8d..b2dc474 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -64,7 +64,7 @@ public final class Coordinator {
      * Create a new Coordinator.
      *
      * @param platform      The platform object to use.
-     * @param time          The timekeeper to use for this Coordinator.
+     * @param scheduler     The scheduler to use for this Coordinator.
      * @param restServer    The REST server to use.
      * @param resource      The AgentRestResoure to use.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index aea9617..fd871bc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -221,11 +221,14 @@ public final class NodeManager {
                         }
                     } else if (state instanceof WorkerDone) {
                         if (!(worker.state instanceof WorkerDone)) {
-                            String error = ((WorkerDone) state).error();
+                            WorkerDone workerDoneState =  (WorkerDone) state;
+                            String error = workerDoneState.error();
                             if (error.isEmpty()) {
-                                log.warn("{}: Worker {} finished with no error.", node.name(), id);
+                                log.info("{}: Worker {} finished with status '{}'",
+                                    node.name(), id, workerDoneState.status());
                             } else {
-                                log.warn("{}: Worker {} finished with error '{}'", node.name(), id, error);
+                                log.warn("{}: Worker {} finished with error '{}' and status '{}'",
+                                    node.name(), id, error, workerDoneState.status());
                             }
                             taskManager.handleWorkerCompletion(node.name(), worker.id, error);
                         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
new file mode 100644
index 0000000..cee2b4a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Histogram.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A histogram that can easily find the average, median etc of a large number of samples in a
+ * restricted domain.
+ */
+public class Histogram {
+    private final int[] counts;
+
+    private final Logger log = LoggerFactory.getLogger(Histogram.class);
+
+    public Histogram(int maxValue) {
+        this.counts = new int[maxValue + 1];
+    }
+
+    /**
+     * Add a new value to the histogram.
+     *
+     * Note that the value will be clipped to the maximum value available in the Histogram instance.
+     * So if the histogram has 100 buckets, inserting 101 will increment the last bucket.
+     */
+    public void add(int value) {
+        if (value < 0) {
+            throw new RuntimeException("invalid negative value.");
+        }
+        if (value >= counts.length) {
+            value = counts.length - 1;
+        }
+        synchronized (this) {
+            int curCount = counts[value];
+            if (curCount < Integer.MAX_VALUE) {
+                counts[value] = counts[value] + 1;
+            }
+        }
+    }
+
+    /**
+     * Add a new value to the histogram.
+     *
+     * Note that the value will be clipped to the maximum value available in the Histogram instance.
+     * This method is provided for convenience, but handles the same numeric range as the method which
+     * takes an int.
+     */
+    public void add(long value) {
+        if (value > Integer.MAX_VALUE) {
+            add(Integer.MAX_VALUE);
+        } else if (value < Integer.MIN_VALUE) {
+            add(Integer.MIN_VALUE);
+        } else {
+            add((int) value);
+        }
+    }
+
+    public static class Summary {
+        /**
+         * The total number of samples.
+         */
+        private final long numSamples;
+
+        /**
+         * The average of all samples.
+         */
+        private final float average;
+
+        /**
+         * Percentile information.
+         *
+         * percentile(fraction=0.99) will have a value which is greater than or equal to 99%
+         * of the samples.  percentile(fraction=0.5) is the median sample.  And so forth.
+         */
+        private final List<PercentileSummary> percentiles;
+
+        Summary(long numSamples, float average, List<PercentileSummary> percentiles) {
+            this.numSamples = numSamples;
+            this.average = average;
+            this.percentiles = percentiles;
+        }
+
+        public long numSamples() {
+            return numSamples;
+        }
+
+        public float average() {
+            return average;
+        }
+
+        public List<PercentileSummary> percentiles() {
+            return percentiles;
+        }
+    }
+
+    /**
+     * Information about a percentile.
+     */
+    public static class PercentileSummary {
+        /**
+         * The fraction of samples which are less than or equal to the value of this percentile.
+         */
+        private final float fraction;
+
+        /**
+         * The value of this percentile.
+         */
+        private final int value;
+
+        PercentileSummary(float fraction, int value) {
+            this.fraction = fraction;
+            this.value = value;
+        }
+
+        public float fraction() {
+            return fraction;
+        }
+
+        public int value() {
+            return value;
+        }
+    }
+
+    public Summary summarize() {
+        return summarize(new float[0]);
+    }
+
+    public Summary summarize(float[] percentiles) {
+        int[] countsCopy = new int[counts.length];
+        synchronized (this) {
+            System.arraycopy(counts, 0, countsCopy, 0, counts.length);
+        }
+        // Verify that the percentiles array is sorted and positive.
+        float prev = 0f;
+        for (int i = 0; i < percentiles.length; i++) {
+            if (percentiles[i] < prev) {
+                throw new RuntimeException("Invalid percentiles fraction array.  Bad element " +
+                    percentiles[i] + ".  The array must be sorted and non-negative.");
+            }
+            if (percentiles[i] > 1.0f) {
+                throw new RuntimeException("Invalid percentiles fraction array.  Bad element " +
+                    percentiles[i] + ".  Elements must be less than or equal to 1.");
+            }
+        }
+        // Find out how many total samples we have, and what the average is.
+        long numSamples = 0;
+        float total = 0f;
+        for (int i = 0; i < countsCopy.length; i++) {
+            long count = countsCopy[i];
+            numSamples = numSamples + count;
+            total = total + (i * count);
+        }
+        float average = (numSamples == 0) ? 0.0f : (total / numSamples);
+
+        List<PercentileSummary> percentileSummaries =
+            summarizePercentiles(countsCopy, percentiles, numSamples);
+        return new Summary(numSamples, average, percentileSummaries);
+    }
+
+    private List<PercentileSummary> summarizePercentiles(int[] countsCopy, float[] percentiles,
+                                                         long numSamples) {
+        if (percentiles.length == 0) {
+            return Collections.emptyList();
+        }
+        List<PercentileSummary> summaries = new ArrayList<>(percentiles.length);
+        int i = 0, j = 0;
+        long seen = 0, next = (long) (numSamples * percentiles[0]);
+        while (true) {
+            if (i == countsCopy.length - 1) {
+                for (; j < percentiles.length; j++) {
+                    summaries.add(new PercentileSummary(percentiles[j], i));
+                }
+                return summaries;
+            }
+            seen += countsCopy[i];
+            while (seen >= next) {
+                summaries.add(new PercentileSummary(percentiles[j], i));
+                j++;
+                if (j == percentiles.length) {
+                    return summaries;
+                }
+                next = (long) (numSamples * percentiles[j]);
+            }
+            i++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
new file mode 100644
index 0000000..c56a22a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchController.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProduceBenchController implements TaskController {
+    private final ProduceBenchSpec spec;
+
+    public ProduceBenchController(ProduceBenchSpec spec) {
+        this.spec = spec;
+    }
+
+    @Override
+    public Set<String> targetNodes(Topology topology) {
+        return Collections.singleton(spec.producerNode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
new file mode 100644
index 0000000..3fb185e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Map;
+
+/**
+ * The specification for a benchmark that produces messages to a set of topics.
+ */
+public class ProduceBenchSpec extends TaskSpec {
+    private final String producerNode;
+    private final String bootstrapServers;
+    private final int targetMessagesPerSec;
+    private final int maxMessages;
+    private final Map<String, String> producerConf;
+    private final int totalTopics;
+    private final int activeTopics;
+
+    @JsonCreator
+    public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
+                         @JsonProperty("durationMs") long durationMs,
+                         @JsonProperty("producerNode") String producerNode,
+                         @JsonProperty("bootstrapServers") String bootstrapServers,
+                         @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
+                         @JsonProperty("maxMessages") int maxMessages,
+                         @JsonProperty("producerConf") Map<String, String> producerConf,
+                         @JsonProperty("totalTopics") int totalTopics,
+                         @JsonProperty("activeTopics") int activeTopics) {
+        super(startMs, durationMs);
+        this.producerNode = producerNode;
+        this.bootstrapServers = bootstrapServers;
+        this.targetMessagesPerSec = targetMessagesPerSec;
+        this.maxMessages = maxMessages;
+        this.producerConf = producerConf;
+        this.totalTopics = totalTopics;
+        this.activeTopics = activeTopics;
+    }
+
+    @JsonProperty
+    public String producerNode() {
+        return producerNode;
+    }
+
+    @JsonProperty
+    public String bootstrapServers() {
+        return bootstrapServers;
+    }
+
+    @JsonProperty
+    public int targetMessagesPerSec() {
+        return targetMessagesPerSec;
+    }
+
+    @JsonProperty
+    public int maxMessages() {
+        return maxMessages;
+    }
+
+    @JsonProperty
+    public Map<String, String> producerConf() {
+        return producerConf;
+    }
+
+    @JsonProperty
+    public int totalTopics() {
+        return totalTopics;
+    }
+
+    @JsonProperty
+    public int activeTopics() {
+        return activeTopics;
+    }
+
+    @Override
+    public TaskController newController(String id) {
+        return new ProduceBenchController(this);
+    }
+
+    @Override
+    public TaskWorker newTaskWorker(String id) {
+        return new ProduceBenchWorker(id, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
new file mode 100644
index 0000000..0d50e3e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ProduceBenchWorker implements TaskWorker {
+    private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
+
+    private static final short REPLICATION_FACTOR = 3;
+
+    private static final int MESSAGE_SIZE = 512;
+
+    private static final int THROTTLE_PERIOD_MS = 100;
+
+    private final String id;
+
+    private final ProduceBenchSpec spec;
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private ScheduledExecutorService executor;
+
+    private AtomicReference<String> status;
+
+    private KafkaFutureImpl<String> doneFuture;
+
+    public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
+        this.id = id;
+        this.spec = spec;
+    }
+
+    @Override
+    public void start(Platform platform, AtomicReference<String> status,
+                      KafkaFutureImpl<String> doneFuture) throws Exception {
+        if (!running.compareAndSet(false, true)) {
+            throw new IllegalStateException("ProducerBenchWorker is already running.");
+        }
+        log.info("{}: Activating ProduceBenchWorker.", id);
+        this.executor = Executors.newScheduledThreadPool(1,
+            ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false));
+        this.status = status;
+        this.doneFuture = doneFuture;
+        executor.submit(new ValidateSpec());
+    }
+
+    private static String topicIndexToName(int topicIndex) {
+        return String.format("topic%05d", topicIndex);
+    }
+
+    private void abort(String what, Exception e) throws KafkaException {
+        log.warn(what + " caught an exception: ", e);
+        doneFuture.completeExceptionally(new KafkaException(what + " caught an exception.", e));
+        throw new KafkaException(e);
+    }
+
+    public class ValidateSpec implements Callable<Void> {
+        @Override
+        public Void call() throws Exception {
+            try {
+                if (spec.activeTopics() == 0) {
+                    throw new ConfigException("Can't have activeTopics == 0.");
+                }
+                if (spec.totalTopics() < spec.activeTopics()) {
+                    throw new ConfigException(String.format(
+                        "activeTopics was %d, but totalTopics was only %d.  activeTopics must " +
+                            "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics()));
+                }
+                executor.submit(new CreateBenchmarkTopics());
+            } catch (Exception e) {
+                abort("ValidateSpec", e);
+            }
+            return null;
+        }
+    }
+
+    public class CreateBenchmarkTopics implements Callable<Void> {
+        private final static int MAX_BATCH_SIZE = 10;
+
+        @Override
+        public Void call() throws Exception {
+            try {
+                Properties props = new Properties();
+                props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+                try (AdminClient adminClient = AdminClient.create(props)) {
+                    List<String> topicsToCreate = new ArrayList<>();
+                    for (int i = 0; i < spec.totalTopics(); i++) {
+                        topicsToCreate.add(topicIndexToName(i));
+                    }
+                    log.info("Creating " + spec.totalTopics() + " topics...");
+                    List<Future<Void>> futures = new ArrayList<>();
+                    while (!topicsToCreate.isEmpty()) {
+                        List<NewTopic> newTopics = new ArrayList<>();
+                        for (int i = 0; (i < MAX_BATCH_SIZE) && !topicsToCreate.isEmpty(); i++) {
+                            String topic = topicsToCreate.remove(0);
+                            newTopics.add(new NewTopic(topic, 1, REPLICATION_FACTOR));
+                        }
+                        futures.add(adminClient.createTopics(newTopics).all());
+                    }
+                    for (Future<Void> future : futures) {
+                        future.get();
+                    }
+                    log.info("Successfully created " + spec.totalTopics() + " topics.");
+                }
+                executor.submit(new SendRecords());
+            } catch (Exception e) {
+                abort("CreateBenchmarkTopics", e);
+            }
+            return null;
+        }
+    }
+
+    private static class SendRecordsCallback implements Callback {
+        private final SendRecords sendRecords;
+        private final long startMs;
+
+        SendRecordsCallback(SendRecords sendRecords, long startMs) {
+            this.sendRecords = sendRecords;
+            this.startMs = startMs;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            long now = Time.SYSTEM.milliseconds();
+            long durationMs = now - startMs;
+            sendRecords.recordDuration(durationMs);
+            if (exception != null) {
+                log.error("SendRecordsCallback: error", exception);
+            }
+        }
+    }
+
+    private int perSecToPerPeriod(float perSec, long periodMs) {
+        float period = ((float) periodMs) / 1000.0f;
+        float perPeriod = perSec * period;
+        perPeriod = Math.max(1.0f, perPeriod);
+        return (int) perPeriod;
+    }
+
+    /**
+     * A subclass of Throttle which flushes the Producer right before the throttle injects a delay.
+     * This avoids including throttling latency in latency measurements.
+     */
+    private static class SendRecordsThrottle extends Throttle {
+        private final KafkaProducer<?, ?> producer;
+
+        SendRecordsThrottle(int maxPerPeriod, KafkaProducer<?, ?> producer) {
+            super(maxPerPeriod, THROTTLE_PERIOD_MS);
+            this.producer = producer;
+        }
+
+        @Override
+        protected synchronized void delay(long amount) throws InterruptedException {
+            long startMs = time().milliseconds();
+            producer.flush();
+            long endMs = time().milliseconds();
+            long delta = endMs - startMs;
+            super.delay(amount - delta);
+        }
+    }
+
+    public class SendRecords implements Callable<Void> {
+        private final Histogram histogram;
+
+        private final Future<?> statusUpdaterFuture;
+
+        private final KafkaProducer<byte[], byte[]> producer;
+
+        private final Throttle throttle;
+
+        SendRecords() {
+            this.histogram = new Histogram(5000);
+            int perPeriod = perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
+            this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
+                new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
+            Properties props = new Properties();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+            for (Map.Entry<String, String> entry : spec.producerConf().entrySet()) {
+                props.setProperty(entry.getKey(), entry.getValue());
+            }
+            this.producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
+            this.throttle = new SendRecordsThrottle(perPeriod, producer);
+        }
+
+        @Override
+        public Void call() throws Exception {
+            long startTimeMs = Time.SYSTEM.milliseconds();
+            try {
+                byte[] key = new byte[MESSAGE_SIZE];
+                byte[] value = new byte[MESSAGE_SIZE];
+                Future<RecordMetadata> future = null;
+                try {
+                    for (int m = 0; m < spec.maxMessages(); m++) {
+                        for (int i = 0; i < spec.activeTopics(); i++) {
+                            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicIndexToName(i), key, value);
+                            future = producer.send(record, new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
+                        }
+                        throttle.increment();
+                    }
+                } finally {
+                    if (future != null) {
+                        future.get();
+                    }
+                    producer.close();
+                }
+            } catch (Exception e) {
+                abort("SendRecords", e);
+            } finally {
+                statusUpdaterFuture.cancel(false);
+                new StatusUpdater(histogram).run();
+                long curTimeMs = Time.SYSTEM.milliseconds();
+                log.info("Sent {} total record(s) in {} ms.  status: {}",
+                    histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
+            }
+            doneFuture.complete("");
+            return null;
+        }
+
+        void recordDuration(long durationMs) {
+            histogram.add(durationMs);
+        }
+    }
+
+    public class StatusUpdater implements Runnable {
+        private final Histogram histogram;
+        private final float[] percentiles;
+
+        StatusUpdater(Histogram histogram) {
+            this.histogram = histogram;
+            this.percentiles = new float[3];
+            this.percentiles[0] = 0.50f;
+            this.percentiles[1] = 0.95f;
+            this.percentiles[2] = 0.99f;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Histogram.Summary summary = histogram.summarize(percentiles);
+                StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
+                    summary.percentiles().get(0).value(),
+                    summary.percentiles().get(1).value(),
+                    summary.percentiles().get(2).value());
+                String statusDataString = JsonUtil.toJsonString(statusData);
+                status.set(statusDataString);
+            } catch (Exception e) {
+                abort("StatusUpdater", e);
+            }
+        }
+    }
+
+    public static class StatusData {
+        private final long totalSent;
+        private final float averageLatencyMs;
+        private final int p50LatencyMs;
+        private final int p90LatencyMs;
+        private final int p99LatencyMs;
+
+        @JsonCreator
+        StatusData(@JsonProperty("totalSent") long totalSent,
+                   @JsonProperty("averageLatencyMs") float averageLatencyMs,
+                   @JsonProperty("p50LatencyMs") int p50latencyMs,
+                   @JsonProperty("p90LatencyMs") int p90latencyMs,
+                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+            this.totalSent = totalSent;
+            this.averageLatencyMs = averageLatencyMs;
+            this.p50LatencyMs = p50latencyMs;
+            this.p90LatencyMs = p90latencyMs;
+            this.p99LatencyMs = p99latencyMs;
+        }
+
+        @JsonProperty
+        public long totalSent() {
+            return totalSent;
+        }
+
+        @JsonProperty
+        public float averageLatencyMs() {
+            return averageLatencyMs;
+        }
+
+        @JsonProperty
+        public int p50LatencyMs() {
+            return p50LatencyMs;
+        }
+
+        @JsonProperty
+        public int p90LatencyMs() {
+            return p90LatencyMs;
+        }
+
+        @JsonProperty
+        public int p99LatencyMs() {
+            return p99LatencyMs;
+        }
+    }
+
+    @Override
+    public void stop(Platform platform) throws Exception {
+        if (!running.compareAndSet(true, false)) {
+            throw new IllegalStateException("ProduceBenchWorker is not running.");
+        }
+        log.info("{}: Deactivating ProduceBenchWorker.", id);
+        doneFuture.complete("");
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.DAYS);
+        this.executor = null;
+        this.status = null;
+        this.doneFuture = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
new file mode 100644
index 0000000..41f9d02
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.utils.Time;
+
+public class Throttle {
+    private final int maxPerPeriod;
+    private final int periodMs;
+    private int count;
+    private long prevPeriod;
+
+    Throttle(int maxPerPeriod, int periodMs) {
+        this.maxPerPeriod = maxPerPeriod;
+        this.periodMs = periodMs;
+        this.count = maxPerPeriod;
+        this.prevPeriod = -1;
+    }
+
+    synchronized public boolean increment() throws InterruptedException {
+        boolean throttled = false;
+        while (true) {
+            if (count < maxPerPeriod) {
+                count++;
+                return throttled;
+            }
+            long now = time().milliseconds();
+            long curPeriod = now / periodMs;
+            if (curPeriod <= prevPeriod) {
+                long nextPeriodMs = (curPeriod + 1) * periodMs;
+                delay(nextPeriodMs - now);
+                throttled = true;
+            } else {
+                prevPeriod = curPeriod;
+                count = 0;
+            }
+        }
+    }
+
+    protected Time time() {
+        return Time.SYSTEM;
+    }
+
+    protected synchronized void delay(long amount) throws InterruptedException {
+        if (amount > 0) {
+            wait(amount);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
new file mode 100644
index 0000000..5193f77
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/HistogramTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HistogramTest {
+    private static Histogram createHistogram(int maxValue, int... values) {
+        Histogram histogram = new Histogram(maxValue);
+        for (int value : values) {
+            histogram.add(value);
+        }
+        return histogram;
+    }
+
+    @Test
+    public void testHistogramAverage() {
+        Histogram empty = createHistogram(1);
+        Assert.assertEquals(0, (int) empty.summarize(new float[0]).average());
+
+        Histogram histogram = createHistogram(70, 1, 2, 3, 4, 5, 6, 1);
+
+        Assert.assertEquals(3, (int) histogram.summarize(new float[0]).average());
+        histogram.add(60);
+        Assert.assertEquals(10, (int) histogram.summarize(new float[0]).average());
+    }
+
+    @Test
+    public void testHistogramSamples() {
+        Histogram empty = createHistogram(100);
+        Assert.assertEquals(0, empty.summarize(new float[0]).numSamples());
+        Histogram histogram = createHistogram(100, 4, 8, 2, 4, 1, 100, 150);
+        Assert.assertEquals(7, histogram.summarize(new float[0]).numSamples());
+        histogram.add(60);
+        Assert.assertEquals(8, histogram.summarize(new float[0]).numSamples());
+    }
+
+    @Test
+    public void testHistogramPercentiles() {
+        Histogram histogram = createHistogram(100, 1, 2, 3, 4, 5, 6, 80, 90);
+        float[] percentiles = new float[] {0.5f, 0.90f, 0.99f, 1f};
+        Histogram.Summary summary = histogram.summarize(percentiles);
+        Assert.assertEquals(8, summary.numSamples());
+        Assert.assertEquals(4, summary.percentiles().get(0).value());
+        Assert.assertEquals(80, summary.percentiles().get(1).value());
+        Assert.assertEquals(80, summary.percentiles().get(2).value());
+        Assert.assertEquals(90, summary.percentiles().get(3).value());
+        histogram.add(30);
+        histogram.add(30);
+        histogram.add(30);
+
+        summary = histogram.summarize(new float[] {0.5f});
+        Assert.assertEquals(11, summary.numSamples());
+        Assert.assertEquals(5, summary.percentiles().get(0).value());
+
+        Histogram empty = createHistogram(100);
+        summary = empty.summarize(new float[] {0.5f});
+        Assert.assertEquals(0, summary.percentiles().get(0).value());
+
+        histogram = createHistogram(1000);
+        histogram.add(100);
+        histogram.add(200);
+        summary = histogram.summarize(new float[] {0f, 0.5f, 1.0f});
+        Assert.assertEquals(0, summary.percentiles().get(0).value());
+        Assert.assertEquals(100, summary.percentiles().get(1).value());
+        Assert.assertEquals(200, summary.percentiles().get(2).value());
+    }
+};
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/58877a0d/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
new file mode 100644
index 0000000..1644545
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/ThrottleTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ThrottleTest {
+    /**
+     * ThrottleMock is a subclass of Throttle that uses a MockTime object.  It calls
+     * MockTime#sleep instead of Object#wait.
+     */
+    private static class ThrottleMock extends Throttle {
+        final MockTime time;
+
+        ThrottleMock(MockTime time, int maxPerSec) {
+            super(maxPerSec, 100);
+            this.time = time;
+        }
+
+        @Override
+        protected Time time() {
+            return time;
+        }
+
+        @Override
+        protected synchronized void delay(long amount) throws InterruptedException {
+            time.sleep(amount);
+        }
+    }
+
+    @Test
+    public void testThrottle() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        ThrottleMock throttle = new ThrottleMock(time, 3);
+        Assert.assertFalse(throttle.increment());
+        Assert.assertEquals(0, time.milliseconds());
+        Assert.assertFalse(throttle.increment());
+        Assert.assertEquals(0, time.milliseconds());
+        Assert.assertFalse(throttle.increment());
+        Assert.assertEquals(0, time.milliseconds());
+        Assert.assertTrue(throttle.increment());
+        Assert.assertEquals(100, time.milliseconds());
+        time.sleep(50);
+        Assert.assertFalse(throttle.increment());
+        Assert.assertEquals(150, time.milliseconds());
+        Assert.assertFalse(throttle.increment());
+        Assert.assertEquals(150, time.milliseconds());
+        Assert.assertTrue(throttle.increment());
+        Assert.assertEquals(200, time.milliseconds());
+    }
+};
+