You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/24 16:50:18 UTC

kafka git commit: KAFKA-4755: Cherry-pick streams tests fixes from trunk

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 495cdbde9 -> 7ba26ef14


KAFKA-4755: Cherry-pick streams tests fixes from trunk

This addresses some tests' instabilities in 0.10.2 that have been fixed in trunk

Author: Eno Thereska <en...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2867 from enothereska/minor-performance-cherrypick-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7ba26ef1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7ba26ef1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7ba26ef1

Branch: refs/heads/0.10.2
Commit: 7ba26ef14eab295a0216d076741a1f3893ddfbf0
Parents: 495cdbd
Author: Eno Thereska <en...@gmail.com>
Authored: Mon Apr 24 09:50:15 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 24 09:50:15 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     | 515 ++++++++++++-------
 .../streams/streams_simple_benchmark_test.py    |  89 +++-
 .../services/performance/streams_performance.py |  10 +-
 tests/kafkatest/services/streams.py             |  10 +-
 4 files changed, 407 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7a10b79..55d9c68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.perf;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,18 +50,38 @@ import org.apache.kafka.test.TestUtils;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
+/**
+ * Class that provides support for a series of benchmarks. It is usually driven by
+ * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
+ * If ran manually through the main() function below, you must do the following:
+ * 1. Have ZK and a Kafka broker set up
+ * 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
+ * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
+ * Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
+ * that this is a real run.
+ *
+ * Note that "all" is a convenience option when running this test locally and will not work when running the test
+ * at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py). That is due to exact syncronization
+ * needs for each test (e.g., you wouldn't want one instance to run "count" while another
+ * is still running "consume"
+ */
 public class SimpleBenchmark {
 
     private final String kafka;
     private final File stateDir;
-
+    private final Boolean loadPhase;
+    private final String testName;
+    private static final String ALL_TESTS = "all";
     private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
 
+    private static final String COUNT_TOPIC = "countTopic";
     private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
     private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
     private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@@ -83,25 +102,90 @@ public class SimpleBenchmark {
     };
 
     private static int numRecords;
-    private static Integer endKey;
-    private static final int KEY_SIZE = 8;
+    private static int processedRecords = 0;
+    private static long processedBytes = 0;
     private static final int VALUE_SIZE = 100;
-    private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
+    private static final long POLL_MS = 500L;
+    private static final int MAX_POLL_RECORDS = 1000;
+    private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
 
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
 
-    public SimpleBenchmark(File stateDir, String kafka) {
+    public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) {
         super();
         this.stateDir = stateDir;
         this.kafka = kafka;
+        this.loadPhase = loadPhase;
+        this.testName = testName;
+    }
+
+    private void run() throws Exception {
+        switch (testName) {
+            case ALL_TESTS:
+                // producer performance
+                produce(SOURCE_TOPIC);
+                // consumer performance
+                consume(SOURCE_TOPIC);
+                // simple stream performance source->process
+                processStream(SOURCE_TOPIC);
+                // simple stream performance source->sink
+                processStreamWithSink(SOURCE_TOPIC);
+                // simple stream performance source->store
+                processStreamWithStateStore(SOURCE_TOPIC);
+                // simple stream performance source->cache->store
+                processStreamWithCachedStateStore(SOURCE_TOPIC);
+                // simple aggregation
+                count(COUNT_TOPIC);
+                // simple streams performance KSTREAM-KTABLE join
+                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+                // simple streams performance KSTREAM-KSTREAM join
+                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+                // simple streams performance KTABLE-KTABLE join
+                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+                break;
+            case "produce":
+                produce(SOURCE_TOPIC);
+                break;
+            case "consume":
+                consume(SOURCE_TOPIC);
+                break;
+            case "count":
+                count(COUNT_TOPIC);
+                break;
+            case "processstream":
+                processStream(SOURCE_TOPIC);
+                break;
+            case "processstreamwithsink":
+                processStreamWithSink(SOURCE_TOPIC);
+                break;
+            case "processstreamwithstatestore":
+                processStreamWithStateStore(SOURCE_TOPIC);
+                break;
+            case "processstreamwithcachedstatestore":
+                processStreamWithCachedStateStore(SOURCE_TOPIC);
+                break;
+            case "kstreamktablejoin":
+                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+                break;
+            case "kstreamkstreamjoin":
+                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+                break;
+            case "ktablektablejoin":
+                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+                break;
+            default:
+                throw new Exception("Unknown test name " + testName);
+
+        }
     }
 
     public static void main(String[] args) throws Exception {
         String kafka = args.length > 0 ? args[0] : "localhost:9092";
         String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
         numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
-        endKey = numRecords - 1;
+        boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
+        String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
 
         final File stateDir = new File(stateDirStr);
         stateDir.mkdir();
@@ -113,60 +197,117 @@ public class SimpleBenchmark {
         System.out.println("kafka=" + kafka);
         System.out.println("stateDir=" + stateDir);
         System.out.println("numRecords=" + numRecords);
+        System.out.println("loadPhase=" + loadPhase);
+        System.out.println("testName=" + testName);
+
+        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName);
+        benchmark.run();
+    }
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka);
-
-        // producer performance
-        benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
-        // consumer performance
-        benchmark.consume(SOURCE_TOPIC);
-        // simple stream performance source->process
-        benchmark.processStream(SOURCE_TOPIC);
-        // simple stream performance source->sink
-        benchmark.processStreamWithSink(SOURCE_TOPIC);
-        // simple stream performance source->store
-        benchmark.processStreamWithStateStore(SOURCE_TOPIC);
-        // simple stream performance source->cache->store
-        benchmark.processStreamWithCachedStateStore(SOURCE_TOPIC);
-        // simple streams performance KSTREAM-KTABLE join
-        benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
-        // simple streams performance KSTREAM-KSTREAM join
-        benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
-        // simple streams performance KTABLE-KTABLE join
-        benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
-    }
-
-    private Properties setJoinProperties(final String applicationId) {
+    private Properties setStreamProperties(final String applicationId) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
         return props;
     }
 
+    private Properties setProduceConsumeProperties(final String clientId) {
+        Properties props = new Properties();
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        // the socket buffer needs to be large, especially when running in AWS with
+        // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+        return props;
+    }
+
+    private boolean maybeSetupPhase(final String topic, final String clientId,
+                                    final boolean skipIfAllTests) throws Exception {
+        processedRecords = 0;
+        processedBytes = 0;
+        // initialize topics
+        if (loadPhase) {
+            if (skipIfAllTests) {
+                // if we run all tests, the produce test will have already loaded the data
+                if (testName.equals(ALL_TESTS)) {
+                    // Skipping loading phase since previously loaded
+                    return true;
+                }
+            }
+            System.out.println("Initializing topic " + topic);
+            // WARNING: The keys must be sequential, i.e., unique, otherwise the logic for when this test
+            // stops will not work (in createCountStreams)
+            produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, false);
+            return true;
+        }
+        return false;
+    }
+
+    private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, byte[]> input = builder.stream(topic);
+
+        input.groupByKey()
+            .count("tmpStoreName").foreach(new CountDownAction(latch));
+
+        return new KafkaStreams(builder, streamConfig);
+    }
+
+    /**
+     * Measure the performance of a simple aggregate like count.
+     * Counts the occurrence of numbers (note that normally people count words, this
+     * example counts numbers)
+     * @param countTopic Topic where numbers are stored
+     * @throws Exception
+     */
+    public void count(String countTopic) throws Exception {
+        if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
+            return;
+        }
+
+        CountDownLatch latch = new CountDownLatch(1);
+        Properties props = setStreamProperties("simple-benchmark-count");
+        final KafkaStreams streams = createCountStreams(props, countTopic, latch);
+        runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
+    }
+
     /**
      * Measure the performance of a KStream-KTable left join. The setup is such that each
      * KStream record joins to exactly one element in the KTable
      */
     public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
-        CountDownLatch latch = new CountDownLatch(numRecords);
+        if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
+            maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
+            return;
+        }
 
-        // initialize topics
-        System.out.println("Initializing kStreamTopic " + kStreamTopic);
-        produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false);
-        System.out.println("Initializing kTableTopic " + kTableTopic);
-        produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false);
+        CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join");
+        Properties props = setStreamProperties("simple-benchmark-kstream-ktable-join");
         final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
 
         // run benchmark
-        runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch);
+        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
     /**
@@ -174,20 +315,19 @@ public class SimpleBenchmark {
      * KStream record joins to exactly one element in the other KStream
      */
     public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
-        CountDownLatch latch = new CountDownLatch(numRecords);
+        if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
+            maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
+            return;
+        }
 
-        // initialize topics
-        System.out.println("Initializing kStreamTopic " + kStreamTopic1);
-        produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false);
-        System.out.println("Initializing kStreamTopic " + kStreamTopic2);
-        produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false);
+        CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join");
+        Properties props = setStreamProperties("simple-benchmark-kstream-kstream-join");
         final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
 
         // run benchmark
-        runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch);
+        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
     }
 
     /**
@@ -195,23 +335,29 @@ public class SimpleBenchmark {
      * KTable record joins to exactly one element in the other KTable
      */
     public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
-        CountDownLatch latch = new CountDownLatch(numRecords);
-
-        // initialize topics
-        System.out.println("Initializing kTableTopic " + kTableTopic1);
-        produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false);
-        System.out.println("Initializing kTableTopic " + kTableTopic2);
-        produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false);
+        if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
+            maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
+            return;
+        }
+        CountDownLatch latch = new CountDownLatch(1);
 
         // setup join
-        Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join");
+        Properties props = setStreamProperties("simple-benchmark-ktable-ktable-join");
         final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
 
         // run benchmark
-        runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch);
+        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
+    private void printResults(final String nameOfBenchmark, final long latency) {
+        System.out.println(nameOfBenchmark +
+            processedRecords + "/" +
+            latency + "/" +
+            recordsPerSec(latency, processedRecords) + "/" +
+            megabytesPerSec(latency, processedBytes));
+    }
+
+    private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
         streams.start();
 
         long startTime = System.currentTimeMillis();
@@ -224,20 +370,12 @@ public class SimpleBenchmark {
             }
         }
         long endTime = System.currentTimeMillis();
-
-
-        System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE));
+        printResults(nameOfBenchmark, endTime - startTime);
 
         streams.close();
     }
 
-
-
-    public void processStream(String topic) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch);
-
+    private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception {
         Thread thread = new Thread() {
             public void run() {
                 streams.start();
@@ -257,96 +395,72 @@ public class SimpleBenchmark {
 
         long endTime = System.currentTimeMillis();
 
-        System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
-
         streams.close();
         try {
             thread.join();
         } catch (Exception ex) {
             // ignore
         }
+
+        return endTime - startTime;
     }
 
-    public void processStreamWithSink(String topic) {
-        CountDownLatch latch = new CountDownLatch(1);
+    public void processStream(final String topic) throws Exception {
+        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
+            return;
+        }
 
-        final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch);
+        CountDownLatch latch = new CountDownLatch(1);
 
-        Thread thread = new Thread() {
-            public void run() {
-                streams.start();
-            }
-        };
-        thread.start();
+        final KafkaStreams streams = createKafkaStreams(topic, latch);
+        long latency = startStreamsThread(streams, latch);
 
-        long startTime = System.currentTimeMillis();
+        printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
+    }
 
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.interrupted();
-            }
+    public void processStreamWithSink(String topic) throws Exception {
+        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
+            return;
         }
 
-        long endTime = System.currentTimeMillis();
+        CountDownLatch latch = new CountDownLatch(1);
+        final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch);
+        long latency = startStreamsThread(streams, latch);
 
-        System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime));
+        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency);
 
-        streams.close();
-        try {
-            thread.join();
-        } catch (Exception ex) {
-            // ignore
-        }
     }
 
-    private void internalProcessStreamWithStore(final KafkaStreams streams, final CountDownLatch latch,
-                                                final String message) {
-        Thread thread = new Thread() {
-            public void run() {
-                streams.start();
-            }
-        };
-        thread.start();
-
-        long startTime = System.currentTimeMillis();
-
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.interrupted();
-            }
+    public void processStreamWithStateStore(String topic) throws Exception {
+        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
+            return;
         }
 
-        long endTime = System.currentTimeMillis();
-
-        System.out.println(message + megaBytePerSec(endTime - startTime));
-
-        streams.close();
-        try {
-            thread.join();
-        } catch (Exception ex) {
-            // ignore
-        }
-    }
-    public void processStreamWithStateStore(String topic) {
         CountDownLatch latch = new CountDownLatch(1);
-
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, false);
-        internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]: ");
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, false);
+        long latency = startStreamsThread(streams, latch);
+        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency);
 
     }
 
-    public void processStreamWithCachedStateStore(String topic) {
-        CountDownLatch latch = new CountDownLatch(1);
-
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, true);
+    public void processStreamWithCachedStateStore(String topic) throws Exception {
+        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
+            return;
+        }
 
-        internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]: ");
+        CountDownLatch latch = new CountDownLatch(1);
+        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, true);
+        long latency = startStreamsThread(streams, latch);
+        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
     }
 
+    public void produce(String topic) throws Exception {
+        // loading phase does not make sense for producer
+        if (loadPhase) {
+            return;
+        }
+        produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
+    }
     /**
      * Produce values to a topic
      * @param topic Topic to produce to
@@ -358,22 +472,28 @@ public class SimpleBenchmark {
      * @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used
      *                   when this produce step is part of another benchmark that produces its own stats
      */
-    public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
-                        int upperRange, boolean printStats) throws Exception {
+    private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
+                         int upperRange, boolean printStats) throws Exception {
 
+        processedRecords = 0;
+        processedBytes = 0;
         if (sequential) {
             if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
         }
-        Properties props = new Properties();
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        if (!sequential) {
+            System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
+        }
+        Properties props = setProduceConsumeProperties(clientId);
+
         int key = 0;
         Random rand = new Random();
         KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
 
         byte[] value = new byte[valueSizeBytes];
+        // put some random values to increase entropy. Some devices
+        // like SSDs do compression and if the array is all zeros
+        // the performance will be too good.
+        new Random().nextBytes(value);
         long startTime = System.currentTimeMillis();
 
         if (sequential) key = 0;
@@ -382,22 +502,24 @@ public class SimpleBenchmark {
             producer.send(new ProducerRecord<>(topic, key, value));
             if (sequential) key++;
             else key = rand.nextInt(upperRange);
+            processedRecords++;
+            processedBytes += value.length + Integer.SIZE;
         }
         producer.close();
 
         long endTime = System.currentTimeMillis();
 
-        if (printStats)
-            System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes));
+        if (printStats) {
+            printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
+        }
     }
 
-    public void consume(String topic) {
-        Properties props = new Properties();
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    public void consume(String topic) throws Exception {
+        if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
+            return;
+        }
+
+        Properties props = setProduceConsumeProperties("simple-benchmark-consumer");
 
         KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
 
@@ -410,33 +532,33 @@ public class SimpleBenchmark {
         long startTime = System.currentTimeMillis();
 
         while (true) {
-            ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
+            ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
             if (records.isEmpty()) {
-                if (endKey.equals(key))
+                if (processedRecords == numRecords)
                     break;
             } else {
                 for (ConsumerRecord<Integer, byte[]> record : records) {
+                    processedRecords++;
+                    processedBytes += record.value().length + Integer.SIZE;
                     Integer recKey = record.key();
-
                     if (key == null || key < recKey)
                         key = recKey;
+                    if (processedRecords == numRecords)
+                        break;
                 }
             }
+            if (processedRecords == numRecords)
+                break;
         }
 
         long endTime = System.currentTimeMillis();
 
         consumer.close();
-        System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
+        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
     }
 
-    private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
+        Properties props = setStreamProperties("simple-benchmark-streams");
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -453,7 +575,9 @@ public class SimpleBenchmark {
 
                     @Override
                     public void process(Integer key, byte[] value) {
-                        if (endKey.equals(key)) {
+                        processedRecords++;
+                        processedBytes += value.length + Integer.SIZE;
+                        if (processedRecords == numRecords) {
                             latch.countDown();
                         }
                     }
@@ -469,16 +593,11 @@ public class SimpleBenchmark {
             }
         });
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
-    private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
+        final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -495,7 +614,9 @@ public class SimpleBenchmark {
 
                     @Override
                     public void process(Integer key, byte[] value) {
-                        if (endKey.equals(key)) {
+                        processedRecords++;
+                        processedBytes += value.length + Integer.SIZE;
+                        if (processedRecords == numRecords) {
                             latch.countDown();
                         }
                     }
@@ -511,17 +632,27 @@ public class SimpleBenchmark {
             }
         });
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
-    private class CountDownAction<K, V> implements ForeachAction<K, V> {
+    private class CountDownAction<V> implements ForeachAction<Integer, V> {
         private CountDownLatch latch;
         CountDownAction(final CountDownLatch latch) {
             this.latch = latch;
         }
         @Override
-        public void apply(K key, V value) {
-            this.latch.countDown();
+        public void apply(Integer key, V value) {
+            processedRecords++;
+            if (value instanceof byte[]) {
+                processedBytes += ((byte[]) value).length + Integer.SIZE;
+            } else if (value instanceof Long) {
+                processedBytes += Long.SIZE + Integer.SIZE;
+            } else {
+                System.err.println("Unknown value type in CountDownAction");
+            }
+            if (processedRecords == numRecords) {
+                this.latch.countDown();
+            }
         }
     }
 
@@ -534,7 +665,7 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
     private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
@@ -546,7 +677,7 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
     private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
@@ -559,18 +690,13 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
-    private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka,
+    private KafkaStreams createKafkaStreamsWithStateStore(String topic,
                                                           final CountDownLatch latch,
                                                           boolean enableCaching) {
-        Properties props = new Properties();
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store" + enableCaching);
-        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
 
         KStreamBuilder builder = new KStreamBuilder();
 
@@ -596,8 +722,9 @@ public class SimpleBenchmark {
                     @Override
                     public void process(Integer key, byte[] value) {
                         store.put(key, value);
-
-                        if (endKey.equals(key)) {
+                        processedRecords++;
+                        processedBytes += value.length + Integer.SIZE;
+                        if (processedRecords == numRecords) {
                             latch.countDown();
                         }
                     }
@@ -613,15 +740,29 @@ public class SimpleBenchmark {
             }
         }, "store");
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
-    private double megaBytePerSec(long time) {
-        return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000);
+    private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
+        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+                streamsClient.close(30, TimeUnit.SECONDS);
+            }
+        });
+
+        return streamsClient;
+    }
+    
+    private double megabytesPerSec(long time, long processedBytes) {
+        return  (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
     }
 
-    private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
-        return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000);
+    private double recordsPerSec(long time, int numRecords) {
+        return numRecords / (time / 1000.0);
     }
 
     private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index ebd69a6..a56def3 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -13,41 +13,86 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from ducktape.tests.test import Test
 from ducktape.mark.resource import cluster
-
+from ducktape.mark import parametrize, matrix
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
 
+from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.version import DEV_BRANCH
 
-class StreamsSimpleBenchmarkTest(KafkaTest):
+class StreamsSimpleBenchmarkTest(Test):
     """
     Simple benchmark of Kafka Streams.
     """
 
     def __init__(self, test_context):
-        super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={
-            'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 },
-            'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 },
-            'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }
-        })
+        super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
+        self.num_records = 1000000L
+        self.replication = 1
 
-        self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
 
-    @cluster(num_nodes=3)
-    def test_simple_benchmark(self):
+    @cluster(num_nodes=9)
+    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+    def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
         """
+        self.driver = [None] * (scale + 1)
+        node = [None] * (scale)
+        data = [None] * (scale)
+
+        #############
+        # SETUP PHASE
+        #############
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+        self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
+            'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+            'countTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+            'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+            'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }
+        })
+        self.kafka.start()
+ 
+        ################
+        # LOAD PHASE
+        ################
+        self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
+                                                         self.num_records * scale, "true", test)
+        self.load_driver.start()
+        self.load_driver.wait()
+        self.load_driver.stop()
+
+        ################
+        # RUN PHASE
+        ################
+        for num in range(0, scale):
+            self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
+                                                             self.num_records/(scale), "false", test)
+            self.driver[num].start()
 
-        self.driver.start()
-        self.driver.wait()
-        self.driver.stop()
-        node = self.driver.node
-        node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False)
+        #######################
+        # STOP + COLLECT PHASE
+        #######################
+        for num in range(0, scale):    
+            self.driver[num].wait()    
+            self.driver[num].stop()
+            node[num] = self.driver[num].node
+            node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
+            data[num] = self.driver[num].collect_data(node[num], "" )
+                
 
-        return self.driver.collect_data(node)
+        final = {}
+        for num in range(0, scale):
+            for key in data[num]:
+                final[key + str(num)] = data[num][key]
+        
+        return final

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 4ccc3b2..8cedb51 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -22,17 +22,19 @@ from kafkatest.services.streams import StreamsTestBaseService
 class StreamsSimpleBenchmarkService(StreamsTestBaseService):
     """Base class for simple Kafka Streams benchmark"""
 
-    def __init__(self, test_context, kafka, numrecs):
+    def __init__(self, test_context, kafka, numrecs, load_phase, test_name):
         super(StreamsSimpleBenchmarkService, self).__init__(test_context,
                                                             kafka,
                                                             "org.apache.kafka.streams.perf.SimpleBenchmark",
-                                                            numrecs)
+                                                            numrecs,
+                                                            load_phase,
+                                                            test_name)
 
-    def collect_data(self, node):
+    def collect_data(self, node, tag = None):
         # Collect the data and return it to the framework
         output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
         data = {}
         for line in output:
             parts = line.split(':')
-            data[parts[0]] = float(parts[1])
+            data[tag + parts[0]] = parts[1]
         return data

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 8bc4d9e..e7be947 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -45,11 +45,13 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, test_context, kafka, streams_class_name, user_test_args):
+    def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
         super(StreamsTestBaseService, self).__init__(test_context, 1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
-                     'user_test_args': user_test_args}
+                     'user_test_args': user_test_args,
+                     'user_test_args1': user_test_args1,
+                     'user_test_args2': user_test_args2}
         self.log_level = "DEBUG"
 
     @property
@@ -75,7 +77,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             node.account.signal(pid, sig, allow_fail=True)
         if clean_shutdown:
             for pid in pids:
-                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
 
         node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
@@ -119,7 +121,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-              " %(kafka)s %(state_dir)s %(user_test_args)s" \
+              " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         return cmd