You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/21 10:47:12 UTC

kafka git commit: MINOR: add Yahoo benchmark to nightly runs

Repository: kafka
Updated Branches:
  refs/heads/trunk cae5977ed -> 55a90938a


MINOR: add Yahoo benchmark to nightly runs

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>

Closes #3289 from enothereska/yahoo-benchmark


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55a90938
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55a90938
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55a90938

Branch: refs/heads/trunk
Commit: 55a90938a12d8928289a30588bbad6c959c48674
Parents: cae5977
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Jun 21 11:46:59 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Jun 21 11:46:59 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   4 +
 .../kafka/streams/perf/SimpleBenchmark.java     |  92 +++--
 .../kafka/streams/perf/YahooBenchmark.java      | 361 +++++++++++++++++++
 .../streams/streams_simple_benchmark_test.py    |  14 +-
 .../services/performance/streams_performance.py |   5 +-
 tests/kafkatest/services/streams.py             |   7 +-
 6 files changed, 438 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c0c92af..1579a1c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -186,6 +186,10 @@
       <allow pkg="com.fasterxml.jackson.databind" />
       <allow pkg="org.apache.kafka.connect.json" />
     </subpackage>
+
+    <subpackage name="perf">
+      <allow pkg="com.fasterxml.jackson.databind" />
+    </subpackage>
     
     <subpackage name="integration">
       <allow pkg="kafka.admin" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 2983df5..ec3ea0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -55,6 +55,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Class that provides support for a series of benchmarks. It is usually driven by
@@ -73,17 +74,20 @@ import java.util.concurrent.TimeUnit;
  */
 public class SimpleBenchmark {
 
-    private final String kafka;
+    final String kafka;
     private final File stateDir;
-    private final Boolean loadPhase;
-    private final String testName;
-    private static final String ALL_TESTS = "all";
+    final Boolean loadPhase;
+    final String testName;
+    final int numThreads;
+    static final String ALL_TESTS = "all";
     private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
     private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
 
     private static final String COUNT_TOPIC = "countTopic";
     private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
     private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
+    private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
+    private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
     private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
         @Override
         public byte[] apply(final byte[] value1, final byte[] value2) {
@@ -101,9 +105,9 @@ public class SimpleBenchmark {
         }
     };
 
-    private static int numRecords;
-    private static int processedRecords = 0;
-    private static long processedBytes = 0;
+    int numRecords;
+    final AtomicInteger processedRecords = new AtomicInteger(0);
+    long processedBytes = 0;
     private static final int VALUE_SIZE = 100;
     private static final long POLL_MS = 500L;
     private static final long COMMIT_INTERVAL_MS = 30000L;
@@ -113,12 +117,15 @@ public class SimpleBenchmark {
     private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
     private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
 
-    public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) {
+    public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase,
+                           final String testName, final int numRecords, final int numThreads) {
         super();
         this.stateDir = stateDir;
         this.kafka = kafka;
         this.loadPhase = loadPhase;
         this.testName = testName;
+        this.numRecords = numRecords;
+        this.numThreads = numThreads;
     }
 
     private void run() throws Exception {
@@ -175,6 +182,9 @@ public class SimpleBenchmark {
             case "ktablektablejoin":
                 kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
                 break;
+            case "yahoo":
+                yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
+                break;
             default:
                 throw new Exception("Unknown test name " + testName);
 
@@ -184,14 +194,13 @@ public class SimpleBenchmark {
     public static void main(String[] args) throws Exception {
         String kafka = args.length > 0 ? args[0] : "localhost:9092";
         String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
-        numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
+        int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
         boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
         String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
+        int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
 
         final File stateDir = new File(stateDirStr);
         stateDir.mkdir();
-        final File rocksdbDir = new File(stateDir, "rocksdb-test");
-        rocksdbDir.mkdir();
 
         // Note: this output is needed for automated tests and must not be removed
         System.out.println("StreamsTest instance started");
@@ -200,17 +209,18 @@ public class SimpleBenchmark {
         System.out.println("numRecords=" + numRecords);
         System.out.println("loadPhase=" + loadPhase);
         System.out.println("testName=" + testName);
+        System.out.println("numThreads=" + numThreads);
 
-        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName);
+        SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName, numRecords, numThreads);
         benchmark.run();
     }
 
-    private Properties setStreamProperties(final String applicationId) {
+    public Properties setStreamProperties(final String applicationId) {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         // the socket buffer needs to be large, especially when running in AWS with
         // high latency. if running locally the default is fine.
@@ -244,8 +254,7 @@ public class SimpleBenchmark {
 
     private boolean maybeSetupPhase(final String topic, final String clientId,
                                     final boolean skipIfAllTests) throws Exception {
-        processedRecords = 0;
-        processedBytes = 0;
+        resetStats();
         // initialize topics
         if (loadPhase) {
             if (skipIfAllTests) {
@@ -264,6 +273,12 @@ public class SimpleBenchmark {
         return false;
     }
 
+    void resetStats() {
+        processedRecords.set(0);
+        processedBytes = 0;
+    }
+
+
     private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
         final KStreamBuilder builder = new KStreamBuilder();
         final KStream<Integer, byte[]> input = builder.stream(topic);
@@ -274,6 +289,13 @@ public class SimpleBenchmark {
         return new KafkaStreams(builder, streamConfig);
     }
 
+
+    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) throws Exception {
+        YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
+
+        benchmark.run();
+    }
+
     /**
      * Measure the performance of a simple aggregate like count.
      * Counts the occurrence of numbers (note that normally people count words, this
@@ -351,15 +373,15 @@ public class SimpleBenchmark {
         runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
     }
 
-    private void printResults(final String nameOfBenchmark, final long latency) {
+    void printResults(final String nameOfBenchmark, final long latency) {
         System.out.println(nameOfBenchmark +
-            processedRecords + "/" +
+            processedRecords.get() + "/" +
             latency + "/" +
-            recordsPerSec(latency, processedRecords) + "/" +
+            recordsPerSec(latency, processedRecords.get()) + "/" +
             megabytesPerSec(latency, processedBytes));
     }
 
-    private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
+    void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
         streams.start();
 
         long startTime = System.currentTimeMillis();
@@ -459,6 +481,7 @@ public class SimpleBenchmark {
     public void produce(String topic) throws Exception {
         // loading phase does not make sense for producer
         if (loadPhase) {
+            resetStats();
             return;
         }
         produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
@@ -477,8 +500,7 @@ public class SimpleBenchmark {
     private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
                          int upperRange, boolean printStats) throws Exception {
 
-        processedRecords = 0;
-        processedBytes = 0;
+
         if (sequential) {
             if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
         }
@@ -504,7 +526,7 @@ public class SimpleBenchmark {
             producer.send(new ProducerRecord<>(topic, key, value));
             if (sequential) key++;
             else key = rand.nextInt(upperRange);
-            processedRecords++;
+            processedRecords.getAndIncrement();
             processedBytes += value.length + Integer.SIZE;
         }
         producer.close();
@@ -536,20 +558,20 @@ public class SimpleBenchmark {
         while (true) {
             ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
             if (records.isEmpty()) {
-                if (processedRecords == numRecords)
+                if (processedRecords.get() == numRecords)
                     break;
             } else {
                 for (ConsumerRecord<Integer, byte[]> record : records) {
-                    processedRecords++;
+                    processedRecords.getAndIncrement();
                     processedBytes += record.value().length + Integer.SIZE;
                     Integer recKey = record.key();
                     if (key == null || key < recKey)
                         key = recKey;
-                    if (processedRecords == numRecords)
+                    if (processedRecords.get() == numRecords)
                         break;
                 }
             }
-            if (processedRecords == numRecords)
+            if (processedRecords.get() == numRecords)
                 break;
         }
 
@@ -577,9 +599,9 @@ public class SimpleBenchmark {
 
                     @Override
                     public void process(Integer key, byte[] value) {
-                        processedRecords++;
+                        processedRecords.getAndIncrement();
                         processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords == numRecords) {
+                        if (processedRecords.get() == numRecords) {
                             latch.countDown();
                         }
                     }
@@ -616,9 +638,9 @@ public class SimpleBenchmark {
 
                     @Override
                     public void process(Integer key, byte[] value) {
-                        processedRecords++;
+                        processedRecords.getAndIncrement();
                         processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords == numRecords) {
+                        if (processedRecords.get() == numRecords) {
                             latch.countDown();
                         }
                     }
@@ -644,7 +666,7 @@ public class SimpleBenchmark {
         }
         @Override
         public void apply(Integer key, V value) {
-            processedRecords++;
+            processedRecords.getAndIncrement();
             if (value instanceof byte[]) {
                 processedBytes += ((byte[]) value).length + Integer.SIZE;
             } else if (value instanceof Long) {
@@ -652,7 +674,7 @@ public class SimpleBenchmark {
             } else {
                 System.err.println("Unknown value type in CountDownAction");
             }
-            if (processedRecords == numRecords) {
+            if (processedRecords.get() == numRecords) {
                 this.latch.countDown();
             }
         }
@@ -724,9 +746,9 @@ public class SimpleBenchmark {
                     @Override
                     public void process(Integer key, byte[] value) {
                         store.put(key, value);
-                        processedRecords++;
+                        processedRecords.getAndIncrement();
                         processedBytes += value.length + Integer.SIZE;
-                        if (processedRecords == numRecords) {
+                        if (processedRecords.get() == numRecords) {
                             latch.countDown();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
new file mode 100644
index 0000000..56c578f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.perf;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+
+/**
+ * A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark
+ * https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
+ * Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog:
+ * https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html
+ */
+public class YahooBenchmark {
+    private final SimpleBenchmark parent;
+    private final String campaignsTopic;
+    private final String eventsTopic;
+
+    static class ProjectedEvent {
+        /* attributes need to be public for serializer to work */
+        /* main attributes */
+        public String eventType;
+        public String adID;
+
+        /* other attributes */
+        public long eventTime;
+        public String userID = UUID.randomUUID().toString(); // not used
+        public String pageID = UUID.randomUUID().toString(); // not used
+        public String addType = "banner78";  // not used
+        public String ipAddress = "1.2.3.4"; // not used
+    }
+
+    static class CampaignAd {
+        /* attributes need to be public for serializer to work */
+        public String adID;
+        public String campaignID;
+    }
+
+    public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
+        this.parent = parent;
+        this.campaignsTopic = campaignsTopic;
+        this.eventsTopic = eventsTopic;
+    }
+
+    // just for Yahoo benchmark
+    private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId,
+                                             final boolean skipIfAllTests,
+                                             final int numCampaigns, final int adsPerCampaign,
+                                             final List<String> ads) throws Exception {
+        parent.resetStats();
+        // initialize topics
+        if (parent.loadPhase) {
+            if (skipIfAllTests) {
+                // if we run all tests, the produce test will have already loaded the data
+                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
+                    // Skipping loading phase since previously loaded
+                    return true;
+                }
+            }
+            System.out.println("Initializing topic " + topic);
+
+            Properties props = new Properties();
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+            for (int c = 0; c < numCampaigns; c++) {
+                String campaignID = UUID.randomUUID().toString();
+                for (int a = 0; a < adsPerCampaign; a++) {
+                    String adId = UUID.randomUUID().toString();
+                    String concat = adId + ":" + campaignID;
+                    producer.send(new ProducerRecord<>(topic, adId, concat));
+                    ads.add(adId);
+                    parent.processedRecords.getAndIncrement();
+                    parent.processedBytes += concat.length() + adId.length();
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    // just for Yahoo benchmark
+    private boolean maybeSetupPhaseEvents(final String topic, final String clientId,
+                                          final boolean skipIfAllTests, final int numRecords,
+                                          final List<String> ads) throws Exception {
+        parent.resetStats();
+        String[] eventTypes = new String[]{"view", "click", "purchase"};
+        Random rand = new Random();
+        // initialize topics
+        if (parent.loadPhase) {
+            if (skipIfAllTests) {
+                // if we run all tests, the produce test will have already loaded the data
+                if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
+                    // Skipping loading phase since previously loaded
+                    return true;
+                }
+            }
+            System.out.println("Initializing topic " + topic);
+
+            Properties props = new Properties();
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+            KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+
+            long startTime = System.currentTimeMillis();
+
+            ProjectedEvent event = new ProjectedEvent();
+
+            Map<String, Object> serdeProps = new HashMap<>();
+            final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
+            serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+            projectedEventSerializer.configure(serdeProps, false);
+
+            for (int i = 0; i < numRecords; i++) {
+                event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
+                event.adID = ads.get(rand.nextInt(ads.size() - 1));
+                event.eventTime = System.currentTimeMillis();
+                byte[] value = projectedEventSerializer.serialize(topic, event);
+                producer.send(new ProducerRecord<>(topic, event.adID, value));
+                parent.processedRecords.getAndIncrement();
+                parent.processedBytes += value.length + event.adID.length();
+            }
+            producer.close();
+
+            long endTime = System.currentTimeMillis();
+
+
+            parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
+            return true;
+        }
+        return false;
+    }
+
+
+    public void run() throws Exception {
+        int numCampaigns = 100;
+        int adsPerCampaign = 10;
+
+        List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
+        if (maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false,
+            numCampaigns, adsPerCampaign, ads)) {
+            maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", false,
+                parent.numRecords, ads);
+            return;
+        }
+
+        CountDownLatch latch = new CountDownLatch(1);
+        Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
+
+        final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic, latch, parent.numRecords);
+        parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
+
+    }
+    // Note: these are also in the streams example package, eventually use 1 file
+    private class JsonPOJOSerializer<T> implements Serializer<T> {
+        private final ObjectMapper objectMapper = new ObjectMapper();
+
+        /**
+         * Default constructor needed by Kafka
+         */
+        public JsonPOJOSerializer() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> props, boolean isKey) {
+        }
+
+        @Override
+        public byte[] serialize(String topic, T data) {
+            if (data == null)
+                return null;
+
+            try {
+                return objectMapper.writeValueAsBytes(data);
+            } catch (Exception e) {
+                throw new SerializationException("Error serializing JSON message", e);
+            }
+        }
+
+        @Override
+        public void close() {
+        }
+
+    }
+
+    // Note: these are also in the streams example package, eventuall use 1 file
+    private class JsonPOJODeserializer<T> implements Deserializer<T> {
+        private ObjectMapper objectMapper = new ObjectMapper();
+
+        private Class<T> tClass;
+
+        /**
+         * Default constructor needed by Kafka
+         */
+        public JsonPOJODeserializer() {
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void configure(Map<String, ?> props, boolean isKey) {
+            tClass = (Class<T>) props.get("JsonPOJOClass");
+        }
+
+        @Override
+        public T deserialize(String topic, byte[] bytes) {
+            if (bytes == null)
+                return null;
+
+            T data;
+            try {
+                data = objectMapper.readValue(bytes, tClass);
+            } catch (Exception e) {
+                throw new SerializationException(e);
+            }
+
+            return data;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+    private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
+                                                     final CountDownLatch latch, final int numRecords) {
+        Map<String, Object> serdeProps = new HashMap<>();
+        final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+        projectedEventSerializer.configure(serdeProps, false);
+        final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+        projectedEventDeserializer.configure(serdeProps, false);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
+            Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
+        final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
+            campaignsTopic, "campaign-state");
+
+
+        KStream<String, ProjectedEvent> filteredEvents = kEvents
+            // use peek to quick when last element is processed
+            .peek(new ForeachAction<String, ProjectedEvent>() {
+                @Override
+                public void apply(String key, ProjectedEvent value) {
+                    parent.processedRecords.getAndIncrement();
+                    if (parent.processedRecords.get() % 1000000 == 0) {
+                        System.out.println("Processed " + parent.processedRecords.get());
+                    }
+                    if (parent.processedRecords.get() >= numRecords) {
+                        latch.countDown();
+                    }
+                }
+            })
+            // only keep "view" events
+            .filter(new Predicate<String, ProjectedEvent>() {
+                @Override
+                public boolean test(final String key, final ProjectedEvent value) {
+                    return value.eventType.equals("view");
+                }
+            })
+            // select just a few of the columns
+            .mapValues(new ValueMapper<ProjectedEvent, ProjectedEvent>() {
+                @Override
+                public ProjectedEvent apply(ProjectedEvent value) {
+                    ProjectedEvent event = new ProjectedEvent();
+                    event.adID = value.adID;
+                    event.eventTime = value.eventTime;
+                    event.eventType = value.eventType;
+                    return event;
+                }
+            });
+
+        // deserialize the add ID and campaign ID from the stored value in Kafka
+        KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(new ValueMapper<String, CampaignAd>() {
+            @Override
+            public CampaignAd apply(String value) {
+                String[] parts = value.split(":");
+                CampaignAd cAdd = new CampaignAd();
+                cAdd.adID = parts[0];
+                cAdd.campaignID = parts[1];
+                return cAdd;
+            }
+        });
+
+        // join the events with the campaigns
+        KStream<String, String> joined = filteredEvents.join(deserCampaigns,
+            new ValueJoiner<ProjectedEvent, CampaignAd, String>() {
+                @Override
+                public String apply(ProjectedEvent value1, CampaignAd value2) {
+                    return value2.campaignID;
+                }
+            }, Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer));
+
+
+        // key by campaign rather than by ad as original
+        KStream<String, String> keyedByCampaign = joined
+            .selectKey(new KeyValueMapper<String, String, String>() {
+                @Override
+                public String apply(String key, String value) {
+                    return value;
+                }
+            });
+
+        // calculate windowed counts
+        keyedByCampaign
+            .groupByKey(Serdes.String(), Serdes.String())
+            .count(TimeWindows.of(10 * 1000), "time-windows");
+
+        return new KafkaStreams(builder, streamConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 6df3cab..258b7c0 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -32,10 +32,10 @@ class StreamsSimpleBenchmarkTest(Test):
         super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
         self.num_records = 20000000L
         self.replication = 1
-
+        self.num_threads = 1
 
     @cluster(num_nodes=9)
-    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark
@@ -58,7 +58,9 @@ class StreamsSimpleBenchmarkTest(Test):
             'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
             'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
             'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
-            'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }
+            'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+            'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
+            'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
         })
         self.kafka.log_level = "INFO"
         self.kafka.start()
@@ -67,7 +69,8 @@ class StreamsSimpleBenchmarkTest(Test):
         # LOAD PHASE
         ################
         self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
-                                                         self.num_records * scale, "true", test)
+                                                         self.num_records * scale, "true", test,
+                                                         self.num_threads)
         self.load_driver.start()
         self.load_driver.wait()
         self.load_driver.stop()
@@ -77,7 +80,8 @@ class StreamsSimpleBenchmarkTest(Test):
         ################
         for num in range(0, scale):
             self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
-                                                             self.num_records/(scale), "false", test)
+                                                             self.num_records/(scale), "false", test,
+                                                             self.num_threads)
             self.driver[num].start()
 
         #######################

http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 8cedb51..94f7249 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -22,13 +22,14 @@ from kafkatest.services.streams import StreamsTestBaseService
 class StreamsSimpleBenchmarkService(StreamsTestBaseService):
     """Base class for simple Kafka Streams benchmark"""
 
-    def __init__(self, test_context, kafka, numrecs, load_phase, test_name):
+    def __init__(self, test_context, kafka, numrecs, load_phase, test_name, num_threads):
         super(StreamsSimpleBenchmarkService, self).__init__(test_context,
                                                             kafka,
                                                             "org.apache.kafka.streams.perf.SimpleBenchmark",
                                                             numrecs,
                                                             load_phase,
-                                                            test_name)
+                                                            test_name,
+                                                            num_threads)
 
     def collect_data(self, node, tag = None):
         # Collect the data and return it to the framework

http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 867e3f5..f6b6dd4 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -45,13 +45,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
             "collect_default": True},
     }
 
-    def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
+    def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
         super(StreamsTestBaseService, self).__init__(test_context, 1)
         self.kafka = kafka
         self.args = {'streams_class_name': streams_class_name,
                      'user_test_args': user_test_args,
                      'user_test_args1': user_test_args1,
-                     'user_test_args2': user_test_args2}
+                     'user_test_args2': user_test_args2,
+                     'user_test_args3': user_test_args3}
         self.log_level = "DEBUG"
 
     @property
@@ -122,7 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
               " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
-              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+              " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         return cmd