You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/15 17:15:35 UTC

[kafka] branch trunk updated: KAFKA-6611, PART II: Improve Streams SimpleBenchmark (#4854)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0dc7f0e  KAFKA-6611, PART II: Improve Streams SimpleBenchmark (#4854)
0dc7f0e is described below

commit 0dc7f0e66f849346049f6f90cee05a2796735d14
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sun Apr 15 10:15:31 2018 -0700

    KAFKA-6611, PART II: Improve Streams SimpleBenchmark (#4854)
    
    SimpleBenchmark:
    
    1.a Do not rely on manual num.records / bytes collection on atomic integers.
    1.b Rely on config files for num.threads, bootstrap.servers, etc.
    1.c Add parameters for key skewness and value size.
    1.d Refactor the tests for loading phase, adding tumbling-windowed count.
    1.e For consumer / consumeproduce, collect metrics on consumer instead.
    1.f Force stop the test after 3 minutes, this is based on empirical numbers of 10M records.
    
    Other tests: use config for kafka bootstrap servers.
    
    streams_simple_benchmark.py: only use scale 1 for system test, remove yahoo from benchmark tests.
    
    Note that the JMX based metrics is more accurate than the manually collected metrics.
    
    Reviewers: John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/clients/NetworkClient.java    |   1 +
 .../kafka/clients/producer/internals/Sender.java   |   9 +-
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 923 +++++++++------------
 .../apache/kafka/streams/perf/YahooBenchmark.java  | 121 ++-
 .../streams/tests/BrokerCompatibilityTest.java     |  18 +-
 .../apache/kafka/streams/tests/EosTestClient.java  |  10 +-
 .../kafka/streams/tests/SmokeTestClient.java       |  43 +-
 .../kafka/streams/tests/SmokeTestDriver.java       |   9 +-
 .../tests/StreamsBrokerDownResilienceTest.java     |  22 +-
 .../apache/kafka/streams/tests/StreamsEosTest.java |  21 +-
 .../tests/StreamsRepeatingIntegerKeyProducer.java  |  21 +-
 .../kafka/streams/tests/StreamsSmokeTest.java      |  23 +-
 .../streams/tests/StreamsStandByReplicaTest.java   |  29 +-
 .../streams/streams_simple_benchmark_test.py       | 114 ++-
 .../services/performance/streams_performance.py    |  52 +-
 tests/kafkatest/services/streams.py                |  21 +-
 tests/kafkatest/services/streams_property.py       |  26 +
 17 files changed, 738 insertions(+), 725 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a9cbc4..fc4745d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -832,6 +832,7 @@ public class NetworkClient implements KafkaClient {
             long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0;
 
             long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
+
             if (metadataTimeout > 0) {
                 return metadataTimeout;
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 0514c99..6a10826 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -252,6 +252,9 @@ public class Sender implements Runnable {
             // and request metadata update, since there are messages to send to the topic.
             for (String topic : result.unknownLeaderTopics)
                 this.metadata.add(topic);
+
+            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
+
             this.metadata.requestUpdate();
         }
 
@@ -557,9 +560,13 @@ public class Sender implements Runnable {
                 failBatch(batch, response, exception, batch.attempts() < this.retries);
             }
             if (error.exception() instanceof InvalidMetadataException) {
-                if (error.exception() instanceof UnknownTopicOrPartitionException)
+                if (error.exception() instanceof UnknownTopicOrPartitionException) {
                     log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                             "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
+                } else {
+                    log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
+                            "to request metadata update now", batch.topicPartition, error.exception().toString());
+                }
                 metadata.requestUpdate();
             }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index c66d78b..faab52e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -48,9 +49,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,7 +58,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Class that provides support for a series of benchmarks. It is usually driven by
@@ -77,114 +75,126 @@ import java.util.concurrent.atomic.AtomicInteger;
  * is still running "consume"
  */
 public class SimpleBenchmark {
+    private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
 
-    final String kafka;
-    final Boolean loadPhase;
-    final String testName;
-    final int numThreads;
-    final Properties props;
-    static final String ALL_TESTS = "all";
-    private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
+    private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
+    private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
 
-    private static final String COUNT_TOPIC = "countTopic";
-    private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
-    private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
     private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
     private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
-    private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
+
+    private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
         @Override
         public byte[] apply(final byte[] value1, final byte[] value2) {
-            if (value1 == null && value2 == null)
-                return new byte[VALUE_SIZE];
-            if (value1 == null && value2 != null)
-                return value2;
-            if (value1 != null && value2 == null)
+            // dump joiner in order to have as less join overhead as possible
+            if (value1 != null) {
                 return value1;
-
-            byte[] tmp = new byte[value1.length + value2.length];
-            System.arraycopy(value1, 0, tmp, 0, value1.length);
-            System.arraycopy(value2, 0, tmp, value1.length, value2.length);
-            return tmp;
+            } else if (value2 != null) {
+                return value2;
+            } else {
+                return new byte[100];
+            }
         }
     };
 
-    int numRecords;
-    final AtomicInteger processedRecords = new AtomicInteger(0);
-    long processedBytes = 0;
-    private static final int VALUE_SIZE = 100;
+    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
+    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
+
+    long processedBytes = 0L;
+    int processedRecords = 0;
+
     private static final long POLL_MS = 500L;
     private static final long COMMIT_INTERVAL_MS = 30000L;
     private static final int MAX_POLL_RECORDS = 1000;
-    private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
 
-    private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
-    private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
+    /* ----------- benchmark variables that are hard-coded ----------- */
+
+    private static final int KEY_SPACE_SIZE = 10000;
+
+    private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
+
+    private static final long AGGREGATE_WINDOW_SIZE = 1000L;
+
+    private static final long AGGREGATE_WINDOW_ADVANCE = 500L;
+
+    private static final int SOCKET_SIZE_BYTES = 1024 * 1024;
+
+    // the following numbers are based on empirical results and should only
+    // be considered for updates when perf results have significantly changed
+
+    // with at least 10 million records, we run for at most 3 minutes
+    private static final int MAX_WAIT_MS = 3 * 60 * 1000;
 
-    public SimpleBenchmark(final Properties props, final String kafka, final Boolean loadPhase,
-                           final String testName, final int numRecords, final int numThreads) {
+    /* ----------- benchmark variables that can be specified ----------- */
+
+    final String testName;
+
+    final int numRecords;
+
+    final Properties props;
+
+    private final int valueSize;
+
+    private final double keySkew;
+
+    /* ----------- ----------------------------------------- ----------- */
+
+
+    private SimpleBenchmark(final Properties props,
+                            final String testName,
+                            final int numRecords,
+                            final double keySkew,
+                            final int valueSize) {
         super();
         this.props = props;
-        this.kafka = kafka;
-        this.loadPhase = loadPhase;
         this.testName = testName;
+        this.keySkew = keySkew;
+        this.valueSize = valueSize;
         this.numRecords = numRecords;
-        this.numThreads = numThreads;
     }
 
     private void run() {
         switch (testName) {
-            case ALL_TESTS:
-                // producer performance
-                produce(SOURCE_TOPIC);
-                // consumer performance
-                consume(SOURCE_TOPIC);
-                // simple stream performance source->process
-                processStream(SOURCE_TOPIC);
-                // simple stream performance source->sink
-                processStreamWithSink(SOURCE_TOPIC);
-                // simple stream performance source->store
-                processStreamWithStateStore(SOURCE_TOPIC);
-                // simple stream performance source->cache->store
-                processStreamWithCachedStateStore(SOURCE_TOPIC);
-                // simple aggregation
-                count(COUNT_TOPIC);
-                // simple streams performance KSTREAM-KTABLE join
-                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
-                // simple streams performance KSTREAM-KSTREAM join
-                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
-                // simple streams performance KTABLE-KTABLE join
-                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+            // loading phases
+            case "load-one":
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
                 break;
-            case "produce":
-                produce(SOURCE_TOPIC);
+            case "load-two":
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
+                produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, numRecords, keySkew, valueSize);
                 break;
+
+            // testing phases
             case "consume":
-                consume(SOURCE_TOPIC);
+                consume(SOURCE_TOPIC_ONE);
                 break;
-            case "count":
-                count(COUNT_TOPIC);
+            case "consumeproduce":
+                consumeAndProduce(SOURCE_TOPIC_ONE);
                 break;
-            case "processstream":
-                processStream(SOURCE_TOPIC);
+            case "streamcount":
+                countStreamsNonWindowed(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithsink":
-                processStreamWithSink(SOURCE_TOPIC);
+            case "streamcountwindowed":
+                countStreamsWindowed(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithstatestore":
-                processStreamWithStateStore(SOURCE_TOPIC);
+            case "streamprocess":
+                processStream(SOURCE_TOPIC_ONE);
                 break;
-            case "processstreamwithcachedstatestore":
-                processStreamWithCachedStateStore(SOURCE_TOPIC);
+            case "streamprocesswithsink":
+                processStreamWithSink(SOURCE_TOPIC_ONE);
                 break;
-            case "kstreamktablejoin":
-                kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+            case "streamprocesswithstatestore":
+                processStreamWithStateStore(SOURCE_TOPIC_ONE);
                 break;
-            case "kstreamkstreamjoin":
-                kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+            case "streamtablejoin":
+                streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
-            case "ktablektablejoin":
-                kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+            case "streamstreamjoin":
+                streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
+                break;
+            case "tabletablejoin":
+                tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
                 break;
             case "yahoo":
                 yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
@@ -196,66 +206,71 @@ public class SimpleBenchmark {
     }
 
     public static void main(String[] args) throws IOException {
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
-        final boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
-        final String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
-        final int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
+        if (args.length < 5) {
+            System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
+            System.exit(1);
+        }
+
+        String propFileName = args[0];
+        String testName = args[1].toLowerCase(Locale.ROOT);
+        int numRecords = Integer.parseInt(args[2]);
+        double keySkew = Double.parseDouble(args[3]); // 0d means even distribution
+        int valueSize = Integer.parseInt(args[4]);
 
         final Properties props = Utils.loadProps(propFileName);
+        final String kafka = props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
 
-        final String stateDirStr;
-        if (props.containsKey(StreamsConfig.STATE_DIR_CONFIG)) {
-            stateDirStr = props.get(StreamsConfig.STATE_DIR_CONFIG).toString();
-        } else {
-            stateDirStr = TestUtils.tempDirectory().getAbsolutePath();
-            props.put(StreamsConfig.STATE_DIR_CONFIG, stateDirStr);
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
         }
 
-        final File stateDir = new File(stateDirStr);
-        stateDir.mkdir();
-
         // Note: this output is needed for automated tests and must not be removed
         System.out.println("StreamsTest instance started");
-        System.out.println("kafka=" + kafka);
+
+        System.out.println("testName=" + testName);
         System.out.println("streamsProperties=" + props);
         System.out.println("numRecords=" + numRecords);
-        System.out.println("loadPhase=" + loadPhase);
-        System.out.println("testName=" + testName);
-        System.out.println("numThreads=" + numThreads);
+        System.out.println("keySkew=" + keySkew);
+        System.out.println("valueSize=" + valueSize);
+
+        final SimpleBenchmark benchmark = new SimpleBenchmark(props, testName, numRecords, keySkew, valueSize);
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(props, kafka, loadPhase, testName, numRecords, numThreads);
         benchmark.run();
     }
 
     public void setStreamProperties(final String applicationId) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
         // the socket buffer needs to be large, especially when running in AWS with
         // high latency. if running locally the default is fine.
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
-        props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
+
+        // improve producer throughput
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
+
         //TODO remove this config or set to smaller value when KIP-91 is merged
         props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
     }
 
     private Properties setProduceConsumeProperties(final String clientId) {
         Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
         props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         // the socket buffer needs to be large, especially when running in AWS with
         // high latency. if running locally the default is fine.
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -266,521 +281,339 @@ public class SimpleBenchmark {
         return props;
     }
 
-    private boolean maybeSetupPhase(final String topic, final String clientId,
-                                    final boolean skipIfAllTests) {
-        resetStats();
-        // initialize topics
-        if (loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already loaded the data
-                if (testName.equals(ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
-            // WARNING: The keys must be sequential, i.e., unique, otherwise the logic for when this test
-            // stops will not work (in createCountStreams)
-            produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, false);
-            return true;
-        }
-        return false;
-    }
-
     void resetStats() {
-        processedRecords.set(0);
-        processedBytes = 0;
-    }
-
-
-    private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, byte[]> input = builder.stream(topic);
-
-        input.groupByKey()
-            .count("tmpStoreName").foreach(new CountDownAction(latch));
-
-        return new KafkaStreams(builder.build(), streamConfig);
-    }
-
-
-    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
-        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
-
-        benchmark.run();
-    }
-
-    /**
-     * Measure the performance of a simple aggregate like count.
-     * Counts the occurrence of numbers (note that normally people count words, this
-     * example counts numbers)
-     * @param countTopic Topic where numbers are stored
-     * @throws Exception
-     */
-    public void count(String countTopic) {
-        if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-        setStreamProperties("simple-benchmark-count");
-        final KafkaStreams streams = createCountStreams(props, countTopic, latch);
-        runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KStream-KTable left join. The setup is such that each
-     * KStream record joins to exactly one element in the KTable
-     */
-    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) {
-        if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
-            maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-kstream-ktable-join");
-        final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    /**
-     * Measure the performance of a KStream-KStream left join. The setup is such that each
-     * KStream record joins to exactly one element in the other KStream
-     */
-    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) {
-        if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
-            maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
-            return;
-        }
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-kstream-kstream-join");
-        final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
+        processedRecords = 0;
+        processedBytes = 0L;
     }
 
     /**
-     * Measure the performance of a KTable-KTable left join. The setup is such that each
-     * KTable record joins to exactly one element in the other KTable
+     * Produce values to a topic
+     * @param clientId String specifying client ID
+     * @param topic Topic to produce to
+     * @param numRecords Number of records to produce
+     * @param keySkew Key zipf distribution skewness
+     * @param valueSize Size of value in bytes
      */
-    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) {
-        if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
-            maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
-            return;
-        }
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // setup join
-        setStreamProperties("simple-benchmark-ktable-ktable-join");
-        final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
-
-        // run benchmark
-        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
-    }
-
-    void printResults(final String nameOfBenchmark, final long latency) {
-        System.out.println(nameOfBenchmark +
-            processedRecords.get() + "/" +
-            latency + "/" +
-            recordsPerSec(latency, processedRecords.get()) + "/" +
-            megabytesPerSec(latency, processedBytes));
-    }
-
-    void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
-        streams.start();
-
-        long startTime = System.currentTimeMillis();
-
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.currentThread().interrupt();
+    private void produce(final String clientId,
+                         final String topic,
+                         final int numRecords,
+                         final double keySkew,
+                         final int valueSize) {
+        final Properties props = setProduceConsumeProperties(clientId);
+        final ZipfGenerator keyGen = new ZipfGenerator(KEY_SPACE_SIZE, keySkew);
+
+        try (final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props)) {
+            final byte[] value = new byte[valueSize];
+            // put some random values to increase entropy. Some devices
+            // like SSDs do compression and if the array is all zeros
+            // the performance will be too good.
+            new Random(System.currentTimeMillis()).nextBytes(value);
+
+            for (int i = 0; i < numRecords; i++) {
+                producer.send(new ProducerRecord<>(topic, keyGen.next(), value));
             }
         }
-        long endTime = System.currentTimeMillis();
-        printResults(nameOfBenchmark, endTime - startTime);
-
-        streams.close();
     }
 
-    private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) {
-        Thread thread = new Thread() {
-            public void run() {
-                streams.start();
-            }
-        };
-        thread.start();
+    private void consumeAndProduce(final String topic) {
+        final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
+        final Properties producerProps = setProduceConsumeProperties("simple-benchmark-producer");
 
-        long startTime = System.currentTimeMillis();
+        final long startTime = System.currentTimeMillis();
+        try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+             final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
 
-        while (latch.getCount() > 0) {
-            try {
-                latch.await();
-            } catch (InterruptedException ex) {
-                Thread.interrupted();
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+
+            while (true) {
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                if (records.isEmpty()) {
+                    if (processedRecords == numRecords) {
+                        break;
+                    }
+                } else {
+                    for (final ConsumerRecord<Integer, byte[]> record : records) {
+                        producer.send(new ProducerRecord<>(SINK_TOPIC, record.key(), record.value()));
+                        processedRecords++;
+                        processedBytes += record.value().length + Integer.SIZE;
+                        if (processedRecords == numRecords) {
+                            break;
+                        }
+                    }
+                }
+                if (processedRecords == numRecords) {
+                    break;
+                }
             }
         }
 
-        long endTime = System.currentTimeMillis();
-
-        streams.close();
-        try {
-            thread.join();
-        } catch (Exception ex) {
-            // ignore
-        }
+        final long endTime = System.currentTimeMillis();
 
-        return endTime - startTime;
+        printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
     }
 
-    public void processStream(final String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
-            return;
-        }
+    private void consume(final String topic) {
+        final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
 
-        CountDownLatch latch = new CountDownLatch(1);
+        final long startTime = System.currentTimeMillis();
 
-        final KafkaStreams streams = createKafkaStreams(topic, latch);
-        long latency = startStreamsThread(streams, latch);
+        try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
 
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
-    }
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
 
-    public void processStreamWithSink(String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
-            return;
+            while (true) {
+                final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
+                if (records.isEmpty()) {
+                    if (processedRecords == numRecords) {
+                        break;
+                    }
+                } else {
+                    for (final ConsumerRecord<Integer, byte[]> record : records) {
+                        processedRecords++;
+                        processedBytes += record.value().length + Integer.SIZE;
+                        if (processedRecords == numRecords) {
+                            break;
+                        }
+                    }
+                }
+                if (processedRecords == numRecords) {
+                    break;
+                }
+            }
         }
 
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch);
-        long latency = startStreamsThread(streams, latch);
-
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency);
+        final long endTime = System.currentTimeMillis();
 
+        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
     }
 
-    public void processStreamWithStateStore(String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
-            return;
-        }
+    private void processStream(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, false);
-        long latency = startStreamsThread(streams, latch);
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency);
+        setStreamProperties("simple-benchmark-streams-source");
 
-    }
+        final StreamsBuilder builder = new StreamsBuilder();
 
-    public void processStreamWithCachedStateStore(String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
-            return;
-        }
+        builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek(new CountDownAction(latch));
 
-        CountDownLatch latch = new CountDownLatch(1);
-        final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, true);
-        long latency = startStreamsThread(streams, latch);
-        printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    public void produce(String topic) {
-        // loading phase does not make sense for producer
-        if (loadPhase) {
-            resetStats();
-            return;
-        }
-        produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
-    }
-    /**
-     * Produce values to a topic
-     * @param topic Topic to produce to
-     * @param valueSizeBytes Size of value in bytes
-     * @param clientId String specifying client ID
-     * @param numRecords Number of records to produce
-     * @param sequential if True, then keys are produced sequentially from 0 to upperRange. In this case upperRange must be >= numRecords.
-     *                   if False, then keys are produced randomly in range [0, upperRange)
-     * @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used
-     *                   when this produce step is part of another benchmark that produces its own stats
-     */
-    private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
-                         int upperRange, boolean printStats) {
+    private void processStreamWithSink(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
 
+        setStreamProperties("simple-benchmark-streams-source-sink");
 
-        if (sequential) {
-            if (upperRange < numRecords) throw new IllegalArgumentException("UpperRange must be >= numRecords");
-        }
-        if (!sequential) {
-            System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
-        }
-        Properties props = setProduceConsumeProperties(clientId);
-
-        int key = 0;
-        Random rand = new Random();
-        KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
-
-        byte[] value = new byte[valueSizeBytes];
-        // put some random values to increase entropy. Some devices
-        // like SSDs do compression and if the array is all zeros
-        // the performance will be too good.
-        new Random().nextBytes(value);
-        long startTime = System.currentTimeMillis();
-
-        if (sequential) key = 0;
-        else key = rand.nextInt(upperRange);
-        for (int i = 0; i < numRecords; i++) {
-            producer.send(new ProducerRecord<>(topic, key, value));
-            if (sequential) key++;
-            else key = rand.nextInt(upperRange);
-            processedRecords.getAndIncrement();
-            processedBytes += value.length + Integer.SIZE;
-        }
-        producer.close();
+        final StreamsBuilder builder = new StreamsBuilder();
 
-        long endTime = System.currentTimeMillis();
+        final KStream<Integer, byte[]> source = builder.stream(topic);
+        source.peek(new CountDownAction(latch)).to(SINK_TOPIC);
 
-        if (printStats) {
-            printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
-        }
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    public void consume(String topic) {
-        if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
-            return;
-        }
-
-        Properties props = setProduceConsumeProperties("simple-benchmark-consumer");
-
-        KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
-
-        List<TopicPartition> partitions = getAllPartitions(consumer, topic);
-        consumer.assign(partitions);
-        consumer.seekToBeginning(partitions);
-
-        Integer key = null;
-
-        long startTime = System.currentTimeMillis();
-
-        while (true) {
-            ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
-            if (records.isEmpty()) {
-                if (processedRecords.get() == numRecords)
-                    break;
-            } else {
-                for (ConsumerRecord<Integer, byte[]> record : records) {
-                    processedRecords.getAndIncrement();
-                    processedBytes += record.value().length + Integer.SIZE;
-                    Integer recKey = record.key();
-                    if (key == null || key < recKey)
-                        key = recKey;
-                    if (processedRecords.get() == numRecords)
-                        break;
-                }
-            }
-            if (processedRecords.get() == numRecords)
-                break;
-        }
-
-        long endTime = System.currentTimeMillis();
+    private void processStreamWithStateStore(final String topic) {
+        final CountDownLatch latch = new CountDownLatch(1);
 
-        consumer.close();
-        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
-    }
-
-    private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
-        setStreamProperties("simple-benchmark-streams");
+        setStreamProperties("simple-benchmark-streams-with-store");
 
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
+                = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
+        builder.addStateStore(storeBuilder);
 
-        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
+        final KStream<Integer, byte[]> source = builder.stream(topic);
 
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
+        source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
             public Processor<Integer, byte[]> get() {
                 return new AbstractProcessor<Integer, byte[]>() {
+                    KeyValueStore<Integer, byte[]> store;
 
+                    @SuppressWarnings("unchecked")
                     @Override
-                    public void init(ProcessorContext context) {
+                    public void init(final ProcessorContext context) {
+                        store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
                     }
 
                     @Override
-                    public void process(Integer key, byte[] value) {
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
+                    public void process(final Integer key, final byte[] value) {
+                        store.put(key, value);
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(final long timestamp) {}
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() {}
                 };
             }
-        });
+        }, "store");
 
-        return createKafkaStreamsWithExceptionHandler(builder, props);
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
-        setStreamProperties("simple-benchmark-streams-with-sink");
+    /**
+     * Measure the performance of a simple aggregate like count.
+     * Counts the occurrence of numbers (note that normally people count words, this
+     * example counts numbers)
+     */
+    private void countStreamsNonWindowed(final String sourceTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-nonwindowed-count");
 
-        StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
 
-        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
+        input.peek(new CountDownAction(latch))
+                .groupByKey()
+                .count();
 
-        source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
-                    @Override
-                    public void init(ProcessorContext context) {
-                    }
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
+    }
 
-                    @Override
-                    public void process(Integer key, byte[] value) {
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
-                    }
+    /**
+     * Measure the performance of a simple aggregate like count.
+     * Counts the occurrence of numbers (note that normally people count words, this
+     * example counts numbers)
+     */
+    private void countStreamsWindowed(final String sourceTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
 
-                    @Override
-                    public void punctuate(long timestamp) {
-                    }
+        setStreamProperties("simple-benchmark-windowed-count");
 
-                    @Override
-                    public void close() {
-                    }
-                };
-            }
-        });
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
 
-        return createKafkaStreamsWithExceptionHandler(builder, props);
-    }
+        input.peek(new CountDownAction(latch))
+                .groupByKey()
+                .windowedBy(TimeWindows.of(AGGREGATE_WINDOW_SIZE).advanceBy(AGGREGATE_WINDOW_ADVANCE))
+                .count();
 
-    private class CountDownAction<V> implements ForeachAction<Integer, V> {
-        private CountDownLatch latch;
-        CountDownAction(final CountDownLatch latch) {
-            this.latch = latch;
-        }
-        @Override
-        public void apply(Integer key, V value) {
-            processedRecords.getAndIncrement();
-            if (value instanceof byte[]) {
-                processedBytes += ((byte[]) value).length + Integer.SIZE;
-            } else if (value instanceof Long) {
-                processedBytes += Long.SIZE + Integer.SIZE;
-            } else {
-                System.err.println("Unknown value type in CountDownAction");
-            }
-            if (processedRecords.get() == numRecords) {
-                this.latch.countDown();
-            }
-        }
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+        runGenericBenchmark(streams, "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic,
-                                                             String kTableTopic, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a KStream-KTable left join. The setup is such that each
+     * KStream record joins to exactly one element in the KTable
+     */
+    private void streamTableJoin(final String kStreamTopic, final String kTableTopic) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-stream-table-join");
+
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic);
+        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic);
+        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
-                                                            String kTableTopic2, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a KStream-KStream left join. The setup is such that each
+     * KStream record joins to exactly one element in the other KStream
+     */
+    private void streamStreamJoin(final String kStreamTopic1, final String kStreamTopic2) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        setStreamProperties("simple-benchmark-stream-stream-join");
+
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2);
+        final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic1);
+        final KStream<Integer, byte[]> input2 = builder.stream(kStreamTopic2);
 
-        input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
+        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(STREAM_STREAM_JOIN_WINDOW)).foreach(new CountDownAction(latch));
+
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec  joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
-                                                              String kStreamTopic2, final CountDownLatch latch) {
+    /**
+     * Measure the performance of a KTable-KTable left join. The setup is such that each
+     * KTable record joins to exactly one element in the other KTable
+     */
+    private void tableTableJoin(String kTableTopic1, String kTableTopic2) {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // setup join
+        setStreamProperties("simple-benchmark-table-table-join");
+
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
-        final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
-        final long timeDifferenceMs = 10000L;
+        final KTable<Integer, byte[]> input1 = builder.table(kTableTopic1);
+        final KTable<Integer, byte[]> input2 = builder.table(kTableTopic2);
+
+        input1.leftJoin(input2, VALUE_JOINER).toStream().foreach(new CountDownAction(latch));
 
-        input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
+        final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
 
-        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
+        // run benchmark
+        runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private KafkaStreams createKafkaStreamsWithStateStore(String topic,
-                                                          final CountDownLatch latch,
-                                                          boolean enableCaching) {
-        setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
+    void printResults(final String nameOfBenchmark, final long latency) {
+        System.out.println(nameOfBenchmark +
+            processedRecords + "/" +
+            latency + "/" +
+            recordsPerSec(latency, processedRecords) + "/" +
+            megabytesPerSec(latency, processedBytes));
+    }
 
-        StreamsBuilder builder = new StreamsBuilder();
+    void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
+        streams.start();
 
-        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
-                = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Integer(), Serdes.ByteArray());
-        if (enableCaching) {
-            builder.addStateStore(storeBuilder.withCachingEnabled());
-        } else {
-            builder.addStateStore(storeBuilder);
+        final long startTime = System.currentTimeMillis();
+        long endTime = startTime;
+
+        while (latch.getCount() > 0 && (endTime - startTime < MAX_WAIT_MS)) {
+            try {
+                latch.await(1000, TimeUnit.MILLISECONDS);
+            } catch (final InterruptedException ex) {
+                Thread.interrupted();
+            }
+
+            endTime = System.currentTimeMillis();
         }
-        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
+        streams.close();
 
-        source.process(new ProcessorSupplier<Integer, byte[]>() {
-            @Override
-            public Processor<Integer, byte[]> get() {
-                return new AbstractProcessor<Integer, byte[]>() {
-                    KeyValueStore<Integer, byte[]> store;
+        printResults(nameOfBenchmark, endTime - startTime);
+    }
 
-                    @SuppressWarnings("unchecked")
-                    @Override
-                    public void init(ProcessorContext context) {
-                        store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
-                    }
+    private class CountDownAction implements ForeachAction<Integer, byte[]> {
+        private final CountDownLatch latch;
 
-                    @Override
-                    public void process(Integer key, byte[] value) {
-                        store.put(key, value);
-                        processedRecords.getAndIncrement();
-                        processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords.get() == numRecords) {
-                            latch.countDown();
-                        }
-                    }
+        CountDownAction(final CountDownLatch latch) {
+            this.latch = latch;
+        }
 
-                    @Override
-                    public void punctuate(long timestamp) {
-                    }
+        @Override
+        public void apply(final Integer key, final byte[] value) {
+            processedRecords++;
+            processedBytes += Integer.SIZE + value.length;
 
-                    @Override
-                    public void close() {
-                    }
-                };
+            if (processedRecords == numRecords) {
+                this.latch.countDown();
             }
-        }, "store");
-
-        return createKafkaStreamsWithExceptionHandler(builder, props);
+        }
     }
 
     private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
@@ -816,4 +649,48 @@ public class SimpleBenchmark {
         return partitions;
     }
 
+    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
+        final YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
+
+        benchmark.run();
+    }
+
+    private class ZipfGenerator {
+        final private Random rand = new Random(System.currentTimeMillis());
+        final private int size;
+        final private double skew;
+
+        private double bottom = 0.0d;
+
+        ZipfGenerator(final int size, final double skew) {
+            this.size = size;
+            this.skew = skew;
+
+            for (int i = 1; i < size; i++) {
+                this.bottom += 1.0d / Math.pow(i, this.skew);
+            }
+        }
+
+        int next() {
+            if (skew == 0.0d) {
+                return rand.nextInt(size);
+            } else {
+                int rank;
+                double dice;
+                double frequency;
+
+                rank = rand.nextInt(size);
+                frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+                dice = rand.nextDouble();
+
+                while (!(dice < frequency)) {
+                    rank = rand.nextInt(size);
+                    frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
+                    dice = rand.nextDouble();
+                }
+
+                return rank;
+            }
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index f63a71f..9c77680 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -89,76 +89,59 @@ public class YahooBenchmark {
     }
 
     // just for Yahoo benchmark
-    private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId,
+    private boolean maybeSetupPhaseCampaigns(final String topic,
+                                             final String clientId,
                                              final boolean skipIfAllTests,
-                                             final int numCampaigns, final int adsPerCampaign,
+                                             final int numCampaigns,
+                                             final int adsPerCampaign,
                                              final List<String> ads) {
         parent.resetStats();
         // initialize topics
-        if (parent.loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already loaded the data
-                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
+        System.out.println("Initializing topic " + topic);
 
-            Properties props = new Properties();
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
-            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
             for (int c = 0; c < numCampaigns; c++) {
-                String campaignID = UUID.randomUUID().toString();
+                final String campaignID = UUID.randomUUID().toString();
                 for (int a = 0; a < adsPerCampaign; a++) {
-                    String adId = UUID.randomUUID().toString();
-                    String concat = adId + ":" + campaignID;
+                    final String adId = UUID.randomUUID().toString();
+                    final String concat = adId + ":" + campaignID;
                     producer.send(new ProducerRecord<>(topic, adId, concat));
                     ads.add(adId);
-                    parent.processedRecords.getAndIncrement();
+                    parent.processedRecords++;
                     parent.processedBytes += concat.length() + adId.length();
                 }
             }
-            return true;
         }
-        return false;
+        return true;
     }
 
     // just for Yahoo benchmark
-    private boolean maybeSetupPhaseEvents(final String topic, final String clientId,
-                                          final boolean skipIfAllTests, final int numRecords,
-                                          final List<String> ads) {
+    private void maybeSetupPhaseEvents(final String topic,
+                                       final String clientId,
+                                       final int numRecords,
+                                       final List<String> ads) {
         parent.resetStats();
-        String[] eventTypes = new String[]{"view", "click", "purchase"};
-        Random rand = new Random();
-        // initialize topics
-        if (parent.loadPhase) {
-            if (skipIfAllTests) {
-                // if we run all tests, the produce test will have already loaded the data
-                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
-                    // Skipping loading phase since previously loaded
-                    return true;
-                }
-            }
-            System.out.println("Initializing topic " + topic);
-
-            Properties props = new Properties();
-            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        final String[] eventTypes = new String[]{"view", "click", "purchase"};
+        final Random rand = new Random(System.currentTimeMillis());
+        System.out.println("Initializing topic " + topic);
 
-            KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+        final Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
 
-            long startTime = System.currentTimeMillis();
+        final long startTime = System.currentTimeMillis();
 
-            ProjectedEvent event = new ProjectedEvent();
-
-            Map<String, Object> serdeProps = new HashMap<>();
+        try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props)) {
+            final ProjectedEvent event = new ProjectedEvent();
+            final Map<String, Object> serdeProps = new HashMap<>();
             final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
             serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
             projectedEventSerializer.configure(serdeProps, false);
@@ -167,36 +150,28 @@ public class YahooBenchmark {
                 event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
                 event.adID = ads.get(rand.nextInt(ads.size() - 1));
                 event.eventTime = System.currentTimeMillis();
-                byte[] value = projectedEventSerializer.serialize(topic, event);
+                final byte[] value = projectedEventSerializer.serialize(topic, event);
                 producer.send(new ProducerRecord<>(topic, event.adID, value));
-                parent.processedRecords.getAndIncrement();
+                parent.processedRecords++;
                 parent.processedBytes += value.length + event.adID.length();
             }
-            producer.close();
-
-            long endTime = System.currentTimeMillis();
+        }
 
+        final long endTime = System.currentTimeMillis();
 
-            parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
-            return true;
-        }
-        return false;
+        parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
     }
 
 
     public void run() {
-        int numCampaigns = 100;
-        int adsPerCampaign = 10;
-
-        List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
-        if (maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false,
-            numCampaigns, adsPerCampaign, ads)) {
-            maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", false,
-                parent.numRecords, ads);
-            return;
-        }
+        final int numCampaigns = 100;
+        final int adsPerCampaign = 10;
+
+        final List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
+        maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false, numCampaigns, adsPerCampaign, ads);
+        maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", parent.numRecords, ads);
 
-        CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch latch = new CountDownLatch(1);
         parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
         //TODO remove this config or set to smaller value when KIP-91 is merged
         parent.props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000);
@@ -299,11 +274,11 @@ public class YahooBenchmark {
             .peek(new ForeachAction<String, ProjectedEvent>() {
                 @Override
                 public void apply(String key, ProjectedEvent value) {
-                    parent.processedRecords.getAndIncrement();
-                    if (parent.processedRecords.get() % 1000000 == 0) {
-                        System.out.println("Processed " + parent.processedRecords.get());
+                    parent.processedRecords++;
+                    if (parent.processedRecords % 1000000 == 0) {
+                        System.out.println("Processed " + parent.processedRecords);
                     }
-                    if (parent.processedRecords.get() >= numRecords) {
+                    if (parent.processedRecords >= numRecords) {
                         latch.countDown();
                     }
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 37d0cb6..e897088 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -48,14 +48,24 @@ public class BrokerCompatibilityTest {
     private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
 
     public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, eosEnabled; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
+        final String propFileName = args[0];
+        final boolean eosEnabled = Boolean.parseBoolean(args[1]);
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
         streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index ecc3b91..2d39d53 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class EosTestClient extends SmokeTestUtil {
 
     static final String APP_ID = "EosTest";
-    private final String kafka;
     private final Properties properties;
     private final boolean withRepartitioning;
     private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
@@ -45,9 +44,8 @@ public class EosTestClient extends SmokeTestUtil {
     private KafkaStreams streams;
     private boolean uncaughtException;
 
-    EosTestClient(final String kafka, final Properties properties, final boolean withRepartitioning) {
+    EosTestClient(final Properties properties, final boolean withRepartitioning) {
         super();
-        this.kafka = kafka;
         this.properties = properties;
         this.withRepartitioning = withRepartitioning;
     }
@@ -79,7 +77,7 @@ public class EosTestClient extends SmokeTestUtil {
             if (streams == null) {
                 uncaughtException = false;
 
-                streams = createKafkaStreams(properties, kafka);
+                streams = createKafkaStreams(properties);
                 streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                     @Override
                     public void uncaughtException(final Thread t, final Throwable e) {
@@ -112,10 +110,8 @@ public class EosTestClient extends SmokeTestUtil {
         }
     }
 
-    private KafkaStreams createKafkaStreams(final Properties props,
-                                            final String kafka) {
+    private KafkaStreams createKafkaStreams(final Properties props) {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
-        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 2936c63..fdfe9e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -44,20 +44,19 @@ import java.util.concurrent.TimeUnit;
 
 public class SmokeTestClient extends SmokeTestUtil {
 
-    private final String kafka;
     private final Properties streamsProperties;
-    private KafkaStreams streams;
+
     private Thread thread;
+    private KafkaStreams streams;
     private boolean uncaughtException = false;
 
-    public SmokeTestClient(final Properties streamsProperties, final String kafka) {
+    public SmokeTestClient(final Properties streamsProperties) {
         super();
-        this.kafka = kafka;
         this.streamsProperties = streamsProperties;
     }
 
     public void start() {
-        streams = createKafkaStreams(streamsProperties, kafka);
+        streams = createKafkaStreams(streamsProperties);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
@@ -97,26 +96,26 @@ public class SmokeTestClient extends SmokeTestUtil {
         }
     }
 
-    private static Properties getStreamsConfig(final Properties props, final String kafka) {
-        final Properties config = new Properties(props);
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        config.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
-        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-        config.put(ProducerConfig.ACKS_CONFIG, "all");
+    private static Properties getStreamsConfig(final Properties props) {
+        final Properties fullProps = new Properties(props);
+        fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
+        fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
+        fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        fullProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
+        fullProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
         //TODO remove this config or set to smaller value when KIP-91 is merged
-        config.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 80000);
+        fullProps.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 80000);
 
-        config.putAll(props);
-        return config;
+        fullProps.putAll(props);
+        return fullProps;
     }
 
-    private static KafkaStreams createKafkaStreams(final Properties props, final String kafka) {
+    private static KafkaStreams createKafkaStreams(final Properties props) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
         final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
@@ -252,7 +251,7 @@ public class SmokeTestClient extends SmokeTestUtil {
             .toStream()
             .to("tagg", Produced.with(stringSerde, longSerde));
 
-        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props, kafka));
+        final KafkaStreams streamsClient = new KafkaStreams(builder.build(), getStreamsConfig(props));
         streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index fc7a26e..50330a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -96,14 +96,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
         };
 
         final Properties props = new Properties();
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "1").getAbsolutePath());
-        SmokeTestClient streams1 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams1 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "2").getAbsolutePath());
-        SmokeTestClient streams2 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams2 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "3").getAbsolutePath());
-        SmokeTestClient streams3 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams3 = new SmokeTestClient(props);
         props.put(StreamsConfig.STATE_DIR_CONFIG, createDir(stateDir, "4").getAbsolutePath());
-        SmokeTestClient streams4 = new SmokeTestClient(props, kafka);
+        SmokeTestClient streams4 = new SmokeTestClient(props);
 
         System.out.println("starting the driver");
         driver.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index 5219c95..8adf43a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -45,17 +45,24 @@ public class StreamsBrokerDownResilienceTest {
     private static final String SINK_TOPIC = "streamsResilienceSink";
 
     public static void main(String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String additionalConfigs = args.length > 2 ? args[2] : null;
-
-        final Serde<String> stringSerde = Serdes.String();
+        final String propFileName = args[0];
+        final String additionalConfigs = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-resilience");
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -79,7 +86,10 @@ public class StreamsBrokerDownResilienceTest {
 
             System.exit(1);
         }
+
         final StreamsBuilder builder = new StreamsBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+
         builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde))
             .peek(new ForeachAction<String, String>() {
                 int messagesProcessed = 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index c5195cb..47a78bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -28,11 +29,21 @@ public class StreamsEosTest {
      *  command := "run" | "process" | "verify"
      */
     public static void main(final String[] args) throws IOException {
-        final String kafka = args[0];
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String command = args.length > 2 ? args[2] : null;
+        if (args.length < 2) {
+            System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
         System.out.println("kafka=" + kafka);
@@ -49,10 +60,10 @@ public class StreamsEosTest {
                 EosTestDriver.generate(kafka);
                 break;
             case "process":
-                new EosTestClient(kafka, streamsProperties, false).start();
+                new EosTestClient(streamsProperties, false).start();
                 break;
             case "process-complex":
-                new EosTestClient(kafka, streamsProperties, true).start();
+                new EosTestClient(streamsProperties, true).start();
                 break;
             case "verify":
                 EosTestDriver.verify(kafka, false);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
index 85ca077..15a9fa0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsRepeatingIntegerKeyProducer.java
@@ -25,7 +25,9 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 
@@ -39,11 +41,24 @@ public class StreamsRepeatingIntegerKeyProducer {
     private static volatile boolean keepProducing = true;
     private volatile static int messageCounter = 0;
 
-    public static void main(String[] args) {
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String configString = args.length > 2 ? args[2] : null;
+        final String propFileName = args[0];
+        final String configString = args[1];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         final Map<String, String> configs = SystemTestUtil.parseConfigs(configString);
         System.out.println("Using provided configs " + configs);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 41c3f6c..2409bd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -30,17 +32,26 @@ public class StreamsSmokeTest {
      *
      * @param args
      */
-    public static void main(final String[] args) throws Exception {
-        final String kafka = args[0];
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String command = args.length > 2 ? args[2] : null;
+    public static void main(final String[] args) throws InterruptedException, IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
         final boolean disableAutoTerminate = args.length > 3;
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
-        System.out.println("kafka=" + kafka);
         System.out.println("props=" + streamsProperties);
         System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
@@ -61,7 +72,7 @@ public class StreamsSmokeTest {
                 break;
             case "process":
                 // this starts a KafkaStreams client
-                final SmokeTestClient client = new SmokeTestClient(streamsProperties, kafka);
+                final SmokeTestClient client = new SmokeTestClient(streamsProperties);
                 client.start();
                 break;
             case "close-deadlock-test":
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 1f44d61..ca18a4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -43,18 +43,25 @@ import java.util.concurrent.TimeUnit;
 public class StreamsStandByReplicaTest {
 
     public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
+            System.exit(1);
+        }
 
         System.out.println("StreamsTest instance started");
 
-        final String kafka = args.length > 0 ? args[0] : "localhost:9092";
-        final String propFileName = args.length > 1 ? args[1] : null;
-        final String additionalConfigs = args.length > 2 ? args[2] : null;
-
-        final Serde<String> stringSerde = Serdes.String();
+        final String propFileName = args[0];
+        final String additionalConfigs = args[1];
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            System.exit(1);
+        }
+        
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
-        streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         streamsProperties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -95,11 +102,13 @@ public class StreamsStandByReplicaTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        String inMemoryStoreName = "in-memory-store";
-        String persistentMemoryStoreName = "persistent-memory-store";
+        final String inMemoryStoreName = "in-memory-store";
+        final String persistentMemoryStoreName = "persistent-memory-store";
 
-        KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore(inMemoryStoreName);
-        KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName);
+        final KeyValueBytesStoreSupplier inMemoryStoreSupplier = Stores.inMemoryKeyValueStore(inMemoryStoreName);
+        final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName);
+
+        final Serde<String> stringSerde = Serdes.String();
 
         KStream<String, String> inputStream = builder.stream(sourceTopic, Consumed.with(stringSerde, stringSerde));
 
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 06aec14..0f1d33f 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -23,6 +23,17 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.version import DEV_BRANCH
 
+STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore"]
+STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed", "streamprocesswithstatestore"]
+STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
+NON_STREAMS_TESTS = ["consume", "consumeproduce"]
+
+ALL_TEST = "all"
+STREAMS_SIMPLE_TEST = "streams-simple"
+STREAMS_COUNT_TEST = "streams-count"
+STREAMS_JOIN_TEST = "streams-join"
+
+
 class StreamsSimpleBenchmarkTest(Test):
     """
     Simple benchmark of Kafka Streams.
@@ -30,19 +41,24 @@ class StreamsSimpleBenchmarkTest(Test):
 
     def __init__(self, test_context):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
+
+        # these values could be updated in ad-hoc benchmarks
+        self.key_skew = 0
+        self.value_size = 1024
         self.num_records = 10000000L
-        self.replication = 1
         self.num_threads = 1
 
-    @cluster(num_nodes=9)
-    @matrix(test=["count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+        self.replication = 1
+
+    @cluster(num_nodes=12)
+    @matrix(test=["consume", "consumeproduce", "streams-simple", "streams-count", "streams-join"], scale=[1])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
         """
         self.driver = [None] * (scale + 1)
-        node = [None] * (scale)
-        data = [None] * (scale)
+
+        self.final = {}
 
         #############
         # SETUP PHASE
@@ -50,63 +66,95 @@ class StreamsSimpleBenchmarkTest(Test):
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
         self.zk.start()
         self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
-            'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication },
-            'countTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+            'simpleBenchmarkSourceTopic1' : { 'partitions': scale, 'replication-factor': self.replication },
+            'simpleBenchmarkSourceTopic2' : { 'partitions': scale, 'replication-factor': self.replication },
             'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
             'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
             'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
         })
         self.kafka.log_level = "INFO"
         self.kafka.start()
- 
+
+
+        load_test = ""
+        if test == ALL_TEST:
+            load_test = "load-two"
+        if test in STREAMS_JOIN_TESTS or test == STREAMS_JOIN_TEST:
+            load_test = "load-two"
+        if test in STREAMS_COUNT_TESTS or test == STREAMS_COUNT_TEST:
+            load_test = "load-one"
+        if test in STREAMS_SIMPLE_TESTS or test == STREAMS_SIMPLE_TEST:
+            load_test = "load-one"
+        if test in NON_STREAMS_TESTS:
+            load_test = "load-one"
+
+
+
         ################
         # LOAD PHASE
         ################
-        self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
-                                                         self.num_records * scale, "true", test,
-                                                         self.num_threads)
+        self.load_driver = StreamsSimpleBenchmarkService(self.test_context,
+                                                         self.kafka,
+                                                         load_test,
+                                                         self.num_threads,
+                                                         self.num_records,
+                                                         self.key_skew,
+                                                         self.value_size)
+
         self.load_driver.start()
-        self.load_driver.wait()
+        self.load_driver.wait(3600) # wait at most 30 minutes
         self.load_driver.stop()
 
+        if test == ALL_TEST:
+            for single_test in STREAMS_SIMPLE_TESTS + STREAMS_COUNT_TESTS + STREAMS_JOIN_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_SIMPLE_TEST:
+            for single_test in STREAMS_SIMPLE_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_COUNT_TEST:
+            for single_test in STREAMS_COUNT_TESTS:
+                self.run_test(single_test, scale)
+        elif test == STREAMS_JOIN_TEST:
+            for single_test in STREAMS_JOIN_TESTS:
+                self.run_test(single_test, scale)
+        else:
+            self.run_test(test, scale)
+
+        return self.final
 
+    def run_test(self, test, scale):
 
         ################
         # RUN PHASE
         ################
         for num in range(0, scale):
-            self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
-                                                             self.num_records/(scale), "false", test,
-                                                             self.num_threads)
+            self.driver[num] = StreamsSimpleBenchmarkService(self.test_context,
+                                                             self.kafka,
+                                                             test,
+                                                             self.num_threads,
+                                                             self.num_records,
+                                                             self.key_skew,
+                                                             self.value_size)
             self.driver[num].start()
 
         #######################
         # STOP + COLLECT PHASE
         #######################
-        for num in range(0, scale):    
-            self.driver[num].wait()    
+        data = [None] * (scale)
+
+        for num in range(0, scale):
+            self.driver[num].wait()
             self.driver[num].stop()
-            node[num] = self.driver[num].node
-            node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
-            data[num] = self.driver[num].collect_data(node[num], "" )
+            self.driver[num].node.account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
+            data[num] = self.driver[num].collect_data(self.driver[num].node, "")
             self.driver[num].read_jmx_output_all_nodes()
 
-
-        final = {}
         for num in range(0, scale):
             for key in data[num]:
-                final[key + str(num)] = data[num][key]
+                self.final[key + "-" + str(num)] = data[num][key]
 
             for key in sorted(self.driver[num].jmx_stats[0]):
                 self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
 
-            final["jmx-avg" + str(num)] = self.driver[num].average_jmx_value
-            final["jmx-max" + str(num)] = self.driver[num].maximum_jmx_value
-
-        return final
+            self.final[test + "-jmx-avg-" + str(num)] = self.driver[num].average_jmx_value
+            self.final[test + "-jmx-max-" + str(num)] = self.driver[num].maximum_jmx_value
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 9f79181..e2dd15b 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -15,25 +15,28 @@
 
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.streams import StreamsTestBaseService
+from kafkatest.services.kafka import KafkaConfig
+from kafkatest.services import streams_property
 
 #
 # Class used to start the simple Kafka Streams benchmark
 #
+
 class StreamsSimpleBenchmarkService(StreamsTestBaseService):
     """Base class for simple Kafka Streams benchmark"""
 
-    def __init__(self, test_context, kafka, numrecs, load_phase, test_name, num_threads):
+    def __init__(self, test_context, kafka, test_name, num_threads, num_recs_or_wait_ms, key_skew, value_size):
         super(StreamsSimpleBenchmarkService, self).__init__(test_context,
                                                             kafka,
                                                             "org.apache.kafka.streams.perf.SimpleBenchmark",
-                                                            numrecs,
-                                                            load_phase,
                                                             test_name,
-                                                            num_threads)
-
-        self.load_phase = load_phase
+                                                            num_recs_or_wait_ms,
+                                                            key_skew,
+                                                            value_size)
 
-        if self.load_phase == "false":
+        self.jmx_option = ""
+        if test_name.startswith('stream') or test_name.startswith('table'):
+            self.jmx_option = "stream-jmx"
             JmxMixin.__init__(self,
                               num_nodes=1,
                               jmx_object_names=['kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
@@ -45,13 +48,27 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService):
                                               'poll-rate'],
                               root=StreamsTestBaseService.PERSISTENT_ROOT)
 
-    def start_cmd(self, node):
-        cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
+        if test_name.startswith('consume'):
+            self.jmx_option = "consumer-jmx"
+            JmxMixin.__init__(self,
+                              num_nodes=1,
+                              jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=simple-benchmark-consumer'],
+                              jmx_attributes=['records-consumed-rate'],
+                              root=StreamsTestBaseService.PERSISTENT_ROOT)
 
-        if self.load_phase == "false":
+        self.num_threads = num_threads
+
+    def prop_file(self):
+        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                             streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
+                             streams_property.NUM_THREADS: self.num_threads})
+        return cfg.render()
+
+
+    def start_cmd(self, node):
+        if self.jmx_option != "":
             args = self.args.copy()
             args['jmx_port'] = self.jmx_port
-            args['kafka'] = self.kafka.bootstrap_servers()
             args['config_file'] = self.CONFIG_FILE
             args['stdout'] = self.STDOUT_FILE
             args['stderr'] = self.STDERR_FILE
@@ -61,23 +78,24 @@ class StreamsSimpleBenchmarkService(StreamsTestBaseService):
 
             cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
                   "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-                  " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
-                  " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+                  " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
+                  " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
-        self.logger.info("Executing streams simple benchmark cmd: " + cmd)
+        else:
+            cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
 
         return cmd
 
     def start_node(self, node):
         super(StreamsSimpleBenchmarkService, self).start_node(node)
 
-        if self.load_phase == "false":
+        if self.jmx_option != "":
             self.start_jmx_tool(1, node)
 
-
     def clean_node(self, node):
-        if self.load_phase == "false":
+        if self.jmx_option != "":
             JmxMixin.clean_node(self, node)
+
         super(StreamsSimpleBenchmarkService, self).clean_node(node)
 
     def collect_data(self, node, tag = None):
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a5be816..796ca31 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -16,6 +16,8 @@
 import os.path
 import signal
 
+import streams_property
+
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -23,7 +25,6 @@ from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.kafka import KafkaConfig
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
 
-STATE_DIR = "state.dir"
 
 class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     """Base class for Streams Test services providing some common settings and functionality"""
@@ -168,14 +169,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
+    def __init__(self, test_context, kafka, streams_class_name, user_test_args1, user_test_args2=None, user_test_args3=None, user_test_args4=None):
         Service.__init__(self, test_context, num_nodes=1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
-                     'user_test_args': user_test_args,
                      'user_test_args1': user_test_args1,
                      'user_test_args2': user_test_args2,
-                     'user_test_args3': user_test_args3}
+                     'user_test_args3': user_test_args3,
+                     'user_test_args4': user_test_args4}
         self.log_level = "DEBUG"
 
     @property
@@ -236,7 +237,6 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def start_cmd(self, node):
         args = self.args.copy()
-        args['kafka'] = self.kafka.bootstrap_servers()
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -246,15 +246,15 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-              " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
-              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+              " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
+              " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         self.logger.info("Executing streams cmd: " + cmd)
 
         return cmd
 
     def prop_file(self):
-        cfg = KafkaConfig(**{STATE_DIR: self.PERSISTENT_ROOT})
+        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
         return cfg.render()
 
     def start_node(self, node):
@@ -374,7 +374,6 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
 
     def start_cmd(self, node):
         args = self.args.copy()
-        args['kafka'] = self.kafka.bootstrap_servers(validate=False)
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -384,8 +383,8 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
-              " %(kafka)s %(config_file)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
-              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+              " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
+              " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         self.logger.info("Executing: " + cmd)
 
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py
new file mode 100644
index 0000000..054ea64
--- /dev/null
+++ b/tests/kafkatest/services/streams_property.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Define Streams configuration property names here.
+"""
+
+STATE_DIR = "state.dir"
+KAFKA_SERVERS = "bootstrap.servers"
+NUM_THREADS = "num.stream.threads"
+
+
+
+

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.