You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/06/21 10:47:12 UTC
kafka git commit: MINOR: add Yahoo benchmark to nightly runs
Repository: kafka
Updated Branches:
refs/heads/trunk cae5977ed -> 55a90938a
MINOR: add Yahoo benchmark to nightly runs
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>
Closes #3289 from enothereska/yahoo-benchmark
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55a90938
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55a90938
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55a90938
Branch: refs/heads/trunk
Commit: 55a90938a12d8928289a30588bbad6c959c48674
Parents: cae5977
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Jun 21 11:46:59 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Wed Jun 21 11:46:59 2017 +0100
----------------------------------------------------------------------
checkstyle/import-control.xml | 4 +
.../kafka/streams/perf/SimpleBenchmark.java | 92 +++--
.../kafka/streams/perf/YahooBenchmark.java | 361 +++++++++++++++++++
.../streams/streams_simple_benchmark_test.py | 14 +-
.../services/performance/streams_performance.py | 5 +-
tests/kafkatest/services/streams.py | 7 +-
6 files changed, 438 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c0c92af..1579a1c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -186,6 +186,10 @@
<allow pkg="com.fasterxml.jackson.databind" />
<allow pkg="org.apache.kafka.connect.json" />
</subpackage>
+
+ <subpackage name="perf">
+ <allow pkg="com.fasterxml.jackson.databind" />
+ </subpackage>
<subpackage name="integration">
<allow pkg="kafka.admin" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 2983df5..ec3ea0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -55,6 +55,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Class that provides support for a series of benchmarks. It is usually driven by
@@ -73,17 +74,20 @@ import java.util.concurrent.TimeUnit;
*/
public class SimpleBenchmark {
- private final String kafka;
+ final String kafka;
private final File stateDir;
- private final Boolean loadPhase;
- private final String testName;
- private static final String ALL_TESTS = "all";
+ final Boolean loadPhase;
+ final String testName;
+ final int numThreads;
+ static final String ALL_TESTS = "all";
private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
private static final String COUNT_TOPIC = "countTopic";
private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
+ private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
+ private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@Override
public byte[] apply(final byte[] value1, final byte[] value2) {
@@ -101,9 +105,9 @@ public class SimpleBenchmark {
}
};
- private static int numRecords;
- private static int processedRecords = 0;
- private static long processedBytes = 0;
+ int numRecords;
+ final AtomicInteger processedRecords = new AtomicInteger(0);
+ long processedBytes = 0;
private static final int VALUE_SIZE = 100;
private static final long POLL_MS = 500L;
private static final long COMMIT_INTERVAL_MS = 30000L;
@@ -113,12 +117,15 @@ public class SimpleBenchmark {
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
- public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) {
+ public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase,
+ final String testName, final int numRecords, final int numThreads) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
this.loadPhase = loadPhase;
this.testName = testName;
+ this.numRecords = numRecords;
+ this.numThreads = numThreads;
}
private void run() throws Exception {
@@ -175,6 +182,9 @@ public class SimpleBenchmark {
case "ktablektablejoin":
kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
break;
+ case "yahoo":
+ yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
+ break;
default:
throw new Exception("Unknown test name " + testName);
@@ -184,14 +194,13 @@ public class SimpleBenchmark {
public static void main(String[] args) throws Exception {
String kafka = args.length > 0 ? args[0] : "localhost:9092";
String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
- numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
+ int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
+ int numThreads = args.length > 5 ? Integer.parseInt(args[5]) : 1;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
- final File rocksdbDir = new File(stateDir, "rocksdb-test");
- rocksdbDir.mkdir();
// Note: this output is needed for automated tests and must not be removed
System.out.println("StreamsTest instance started");
@@ -200,17 +209,18 @@ public class SimpleBenchmark {
System.out.println("numRecords=" + numRecords);
System.out.println("loadPhase=" + loadPhase);
System.out.println("testName=" + testName);
+ System.out.println("numThreads=" + numThreads);
- SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName);
+ SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName, numRecords, numThreads);
benchmark.run();
}
- private Properties setStreamProperties(final String applicationId) {
+ public Properties setStreamProperties(final String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
@@ -244,8 +254,7 @@ public class SimpleBenchmark {
private boolean maybeSetupPhase(final String topic, final String clientId,
final boolean skipIfAllTests) throws Exception {
- processedRecords = 0;
- processedBytes = 0;
+ resetStats();
// initialize topics
if (loadPhase) {
if (skipIfAllTests) {
@@ -264,6 +273,12 @@ public class SimpleBenchmark {
return false;
}
+ void resetStats() {
+ processedRecords.set(0);
+ processedBytes = 0;
+ }
+
+
private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Integer, byte[]> input = builder.stream(topic);
@@ -274,6 +289,13 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, streamConfig);
}
+
+ private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) throws Exception {
+ YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
+
+ benchmark.run();
+ }
+
/**
* Measure the performance of a simple aggregate like count.
* Counts the occurrence of numbers (note that normally people count words, this
@@ -351,15 +373,15 @@ public class SimpleBenchmark {
runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
- private void printResults(final String nameOfBenchmark, final long latency) {
+ void printResults(final String nameOfBenchmark, final long latency) {
System.out.println(nameOfBenchmark +
- processedRecords + "/" +
+ processedRecords.get() + "/" +
latency + "/" +
- recordsPerSec(latency, processedRecords) + "/" +
+ recordsPerSec(latency, processedRecords.get()) + "/" +
megabytesPerSec(latency, processedBytes));
}
- private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
+ void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
streams.start();
long startTime = System.currentTimeMillis();
@@ -459,6 +481,7 @@ public class SimpleBenchmark {
public void produce(String topic) throws Exception {
// loading phase does not make sense for producer
if (loadPhase) {
+ resetStats();
return;
}
produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
@@ -477,8 +500,7 @@ public class SimpleBenchmark {
private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
int upperRange, boolean printStats) throws Exception {
- processedRecords = 0;
- processedBytes = 0;
+
if (sequential) {
if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
}
@@ -504,7 +526,7 @@ public class SimpleBenchmark {
producer.send(new ProducerRecord<>(topic, key, value));
if (sequential) key++;
else key = rand.nextInt(upperRange);
- processedRecords++;
+ processedRecords.getAndIncrement();
processedBytes += value.length + Integer.SIZE;
}
producer.close();
@@ -536,20 +558,20 @@ public class SimpleBenchmark {
while (true) {
ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
if (records.isEmpty()) {
- if (processedRecords == numRecords)
+ if (processedRecords.get() == numRecords)
break;
} else {
for (ConsumerRecord<Integer, byte[]> record : records) {
- processedRecords++;
+ processedRecords.getAndIncrement();
processedBytes += record.value().length + Integer.SIZE;
Integer recKey = record.key();
if (key == null || key < recKey)
key = recKey;
- if (processedRecords == numRecords)
+ if (processedRecords.get() == numRecords)
break;
}
}
- if (processedRecords == numRecords)
+ if (processedRecords.get() == numRecords)
break;
}
@@ -577,9 +599,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
- processedRecords++;
+ processedRecords.getAndIncrement();
processedBytes += value.length + Integer.SIZE;
- if (processedRecords == numRecords) {
+ if (processedRecords.get() == numRecords) {
latch.countDown();
}
}
@@ -616,9 +638,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
- processedRecords++;
+ processedRecords.getAndIncrement();
processedBytes += value.length + Integer.SIZE;
- if (processedRecords == numRecords) {
+ if (processedRecords.get() == numRecords) {
latch.countDown();
}
}
@@ -644,7 +666,7 @@ public class SimpleBenchmark {
}
@Override
public void apply(Integer key, V value) {
- processedRecords++;
+ processedRecords.getAndIncrement();
if (value instanceof byte[]) {
processedBytes += ((byte[]) value).length + Integer.SIZE;
} else if (value instanceof Long) {
@@ -652,7 +674,7 @@ public class SimpleBenchmark {
} else {
System.err.println("Unknown value type in CountDownAction");
}
- if (processedRecords == numRecords) {
+ if (processedRecords.get() == numRecords) {
this.latch.countDown();
}
}
@@ -724,9 +746,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
store.put(key, value);
- processedRecords++;
+ processedRecords.getAndIncrement();
processedBytes += value.length + Integer.SIZE;
- if (processedRecords == numRecords) {
+ if (processedRecords.get() == numRecords) {
latch.countDown();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
new file mode 100644
index 0000000..56c578f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.perf;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+
+/**
+ * A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark
+ * https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
+ * Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog:
+ * https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html
+ */
+public class YahooBenchmark {
+ private final SimpleBenchmark parent;
+ private final String campaignsTopic;
+ private final String eventsTopic;
+
+ static class ProjectedEvent {
+ /* attributes need to be public for serializer to work */
+ /* main attributes */
+ public String eventType;
+ public String adID;
+
+ /* other attributes */
+ public long eventTime;
+ public String userID = UUID.randomUUID().toString(); // not used
+ public String pageID = UUID.randomUUID().toString(); // not used
+ public String addType = "banner78"; // not used
+ public String ipAddress = "1.2.3.4"; // not used
+ }
+
+ static class CampaignAd {
+ /* attributes need to be public for serializer to work */
+ public String adID;
+ public String campaignID;
+ }
+
+ public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
+ this.parent = parent;
+ this.campaignsTopic = campaignsTopic;
+ this.eventsTopic = eventsTopic;
+ }
+
+ // just for Yahoo benchmark
+ private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId,
+ final boolean skipIfAllTests,
+ final int numCampaigns, final int adsPerCampaign,
+ final List<String> ads) throws Exception {
+ parent.resetStats();
+ // initialize topics
+ if (parent.loadPhase) {
+ if (skipIfAllTests) {
+ // if we run all tests, the produce test will have already loaded the data
+ if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
+ // Skipping loading phase since previously loaded
+ return true;
+ }
+ }
+ System.out.println("Initializing topic " + topic);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props);
+ for (int c = 0; c < numCampaigns; c++) {
+ String campaignID = UUID.randomUUID().toString();
+ for (int a = 0; a < adsPerCampaign; a++) {
+ String adId = UUID.randomUUID().toString();
+ String concat = adId + ":" + campaignID;
+ producer.send(new ProducerRecord<>(topic, adId, concat));
+ ads.add(adId);
+ parent.processedRecords.getAndIncrement();
+ parent.processedBytes += concat.length() + adId.length();
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ // just for Yahoo benchmark
+ private boolean maybeSetupPhaseEvents(final String topic, final String clientId,
+ final boolean skipIfAllTests, final int numRecords,
+ final List<String> ads) throws Exception {
+ parent.resetStats();
+ String[] eventTypes = new String[]{"view", "click", "purchase"};
+ Random rand = new Random();
+ // initialize topics
+ if (parent.loadPhase) {
+ if (skipIfAllTests) {
+ // if we run all tests, the produce test will have already loaded the data
+ if (parent.testName.equals(SimpleBenchmark.ALL_TESTS)) {
+ // Skipping loading phase since previously loaded
+ return true;
+ }
+ }
+ System.out.println("Initializing topic " + topic);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.kafka);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+ KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+
+ long startTime = System.currentTimeMillis();
+
+ ProjectedEvent event = new ProjectedEvent();
+
+ Map<String, Object> serdeProps = new HashMap<>();
+ final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+ projectedEventSerializer.configure(serdeProps, false);
+
+ for (int i = 0; i < numRecords; i++) {
+ event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
+ event.adID = ads.get(rand.nextInt(ads.size() - 1));
+ event.eventTime = System.currentTimeMillis();
+ byte[] value = projectedEventSerializer.serialize(topic, event);
+ producer.send(new ProducerRecord<>(topic, event.adID, value));
+ parent.processedRecords.getAndIncrement();
+ parent.processedBytes += value.length + event.adID.length();
+ }
+ producer.close();
+
+ long endTime = System.currentTimeMillis();
+
+
+ parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
+ return true;
+ }
+ return false;
+ }
+
+
+ public void run() throws Exception {
+ int numCampaigns = 100;
+ int adsPerCampaign = 10;
+
+ List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
+ if (maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false,
+ numCampaigns, adsPerCampaign, ads)) {
+ maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", false,
+ parent.numRecords, ads);
+ return;
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
+
+ final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic, latch, parent.numRecords);
+ parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
+
+ }
+ // Note: these are also in the streams example package, eventually use 1 file
+ private class JsonPOJOSerializer<T> implements Serializer<T> {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public JsonPOJOSerializer() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> props, boolean isKey) {
+ }
+
+ @Override
+ public byte[] serialize(String topic, T data) {
+ if (data == null)
+ return null;
+
+ try {
+ return objectMapper.writeValueAsBytes(data);
+ } catch (Exception e) {
+ throw new SerializationException("Error serializing JSON message", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ // Note: these are also in the streams example package, eventuall use 1 file
+ private class JsonPOJODeserializer<T> implements Deserializer<T> {
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ private Class<T> tClass;
+
+ /**
+ * Default constructor needed by Kafka
+ */
+ public JsonPOJODeserializer() {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> props, boolean isKey) {
+ tClass = (Class<T>) props.get("JsonPOJOClass");
+ }
+
+ @Override
+ public T deserialize(String topic, byte[] bytes) {
+ if (bytes == null)
+ return null;
+
+ T data;
+ try {
+ data = objectMapper.readValue(bytes, tClass);
+ } catch (Exception e) {
+ throw new SerializationException(e);
+ }
+
+ return data;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+
+ private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
+ final CountDownLatch latch, final int numRecords) {
+ Map<String, Object> serdeProps = new HashMap<>();
+ final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
+ serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+ projectedEventSerializer.configure(serdeProps, false);
+ final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>();
+ serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
+ projectedEventDeserializer.configure(serdeProps, false);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
+ Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
+ final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
+ campaignsTopic, "campaign-state");
+
+
+ KStream<String, ProjectedEvent> filteredEvents = kEvents
+ // use peek to quick when last element is processed
+ .peek(new ForeachAction<String, ProjectedEvent>() {
+ @Override
+ public void apply(String key, ProjectedEvent value) {
+ parent.processedRecords.getAndIncrement();
+ if (parent.processedRecords.get() % 1000000 == 0) {
+ System.out.println("Processed " + parent.processedRecords.get());
+ }
+ if (parent.processedRecords.get() >= numRecords) {
+ latch.countDown();
+ }
+ }
+ })
+ // only keep "view" events
+ .filter(new Predicate<String, ProjectedEvent>() {
+ @Override
+ public boolean test(final String key, final ProjectedEvent value) {
+ return value.eventType.equals("view");
+ }
+ })
+ // select just a few of the columns
+ .mapValues(new ValueMapper<ProjectedEvent, ProjectedEvent>() {
+ @Override
+ public ProjectedEvent apply(ProjectedEvent value) {
+ ProjectedEvent event = new ProjectedEvent();
+ event.adID = value.adID;
+ event.eventTime = value.eventTime;
+ event.eventType = value.eventType;
+ return event;
+ }
+ });
+
+ // deserialize the add ID and campaign ID from the stored value in Kafka
+ KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(new ValueMapper<String, CampaignAd>() {
+ @Override
+ public CampaignAd apply(String value) {
+ String[] parts = value.split(":");
+ CampaignAd cAdd = new CampaignAd();
+ cAdd.adID = parts[0];
+ cAdd.campaignID = parts[1];
+ return cAdd;
+ }
+ });
+
+ // join the events with the campaigns
+ KStream<String, String> joined = filteredEvents.join(deserCampaigns,
+ new ValueJoiner<ProjectedEvent, CampaignAd, String>() {
+ @Override
+ public String apply(ProjectedEvent value1, CampaignAd value2) {
+ return value2.campaignID;
+ }
+ }, Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer));
+
+
+ // key by campaign rather than by ad as original
+ KStream<String, String> keyedByCampaign = joined
+ .selectKey(new KeyValueMapper<String, String, String>() {
+ @Override
+ public String apply(String key, String value) {
+ return value;
+ }
+ });
+
+ // calculate windowed counts
+ keyedByCampaign
+ .groupByKey(Serdes.String(), Serdes.String())
+ .count(TimeWindows.of(10 * 1000), "time-windows");
+
+ return new KafkaStreams(builder, streamConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 6df3cab..258b7c0 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -32,10 +32,10 @@ class StreamsSimpleBenchmarkTest(Test):
super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
self.num_records = 20000000L
self.replication = 1
-
+ self.num_threads = 1
@cluster(num_nodes=9)
- @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+ @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin", "yahoo"], scale=[1, 3])
def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
@@ -58,7 +58,9 @@ class StreamsSimpleBenchmarkTest(Test):
'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
- 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }
+ 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
+ 'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
})
self.kafka.log_level = "INFO"
self.kafka.start()
@@ -67,7 +69,8 @@ class StreamsSimpleBenchmarkTest(Test):
# LOAD PHASE
################
self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
- self.num_records * scale, "true", test)
+ self.num_records * scale, "true", test,
+ self.num_threads)
self.load_driver.start()
self.load_driver.wait()
self.load_driver.stop()
@@ -77,7 +80,8 @@ class StreamsSimpleBenchmarkTest(Test):
################
for num in range(0, scale):
self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
- self.num_records/(scale), "false", test)
+ self.num_records/(scale), "false", test,
+ self.num_threads)
self.driver[num].start()
#######################
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 8cedb51..94f7249 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -22,13 +22,14 @@ from kafkatest.services.streams import StreamsTestBaseService
class StreamsSimpleBenchmarkService(StreamsTestBaseService):
"""Base class for simple Kafka Streams benchmark"""
- def __init__(self, test_context, kafka, numrecs, load_phase, test_name):
+ def __init__(self, test_context, kafka, numrecs, load_phase, test_name, num_threads):
super(StreamsSimpleBenchmarkService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.perf.SimpleBenchmark",
numrecs,
load_phase,
- test_name)
+ test_name,
+ num_threads)
def collect_data(self, node, tag = None):
# Collect the data and return it to the framework
http://git-wip-us.apache.org/repos/asf/kafka/blob/55a90938/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 867e3f5..f6b6dd4 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -45,13 +45,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"collect_default": True},
}
- def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
+ def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None, user_test_args3=None):
super(StreamsTestBaseService, self).__init__(test_context, 1)
self.kafka = kafka
self.args = {'streams_class_name': streams_class_name,
'user_test_args': user_test_args,
'user_test_args1': user_test_args1,
- 'user_test_args2': user_test_args2}
+ 'user_test_args2': user_test_args2,
+ 'user_test_args3': user_test_args3}
self.log_level = "DEBUG"
@property
@@ -122,7 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
- " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd