You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/24 16:50:18 UTC
kafka git commit: KAFKA-4755: Cherry-pick streams tests fixes from
trunk
Repository: kafka
Updated Branches:
refs/heads/0.10.2 495cdbde9 -> 7ba26ef14
KAFKA-4755: Cherry-pick streams tests fixes from trunk
This addresses some tests' instabilities in 0.10.2 that have been fixed in trunk
Author: Eno Thereska <en...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2867 from enothereska/minor-performance-cherrypick-0.10.2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7ba26ef1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7ba26ef1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7ba26ef1
Branch: refs/heads/0.10.2
Commit: 7ba26ef14eab295a0216d076741a1f3893ddfbf0
Parents: 495cdbd
Author: Eno Thereska <en...@gmail.com>
Authored: Mon Apr 24 09:50:15 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 24 09:50:15 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/perf/SimpleBenchmark.java | 515 ++++++++++++-------
.../streams/streams_simple_benchmark_test.py | 89 +++-
.../services/performance/streams_performance.py | 10 +-
tests/kafkatest/services/streams.py | 10 +-
4 files changed, 407 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7a10b79..55d9c68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.perf;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,18 +50,38 @@ import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
+/**
+ * Class that provides support for a series of benchmarks. It is usually driven by
+ * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
+ * If ran manually through the main() function below, you must do the following:
+ * 1. Have ZK and a Kafka broker set up
+ * 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
+ * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
+ * Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
+ * that this is a real run.
+ *
+ * Note that "all" is a convenience option when running this test locally and will not work when running the test
+ * at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py). That is due to exact syncronization
+ * needs for each test (e.g., you wouldn't want one instance to run "count" while another
+ * is still running "consume"
+ */
public class SimpleBenchmark {
private final String kafka;
private final File stateDir;
-
+ private final Boolean loadPhase;
+ private final String testName;
+ private static final String ALL_TESTS = "all";
private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
+ private static final String COUNT_TOPIC = "countTopic";
private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@@ -83,25 +102,90 @@ public class SimpleBenchmark {
};
private static int numRecords;
- private static Integer endKey;
- private static final int KEY_SIZE = 8;
+ private static int processedRecords = 0;
+ private static long processedBytes = 0;
private static final int VALUE_SIZE = 100;
- private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
+ private static final long POLL_MS = 500L;
+ private static final int MAX_POLL_RECORDS = 1000;
+ private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024;
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
- public SimpleBenchmark(File stateDir, String kafka) {
+ public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
+ this.loadPhase = loadPhase;
+ this.testName = testName;
+ }
+
+ private void run() throws Exception {
+ switch (testName) {
+ case ALL_TESTS:
+ // producer performance
+ produce(SOURCE_TOPIC);
+ // consumer performance
+ consume(SOURCE_TOPIC);
+ // simple stream performance source->process
+ processStream(SOURCE_TOPIC);
+ // simple stream performance source->sink
+ processStreamWithSink(SOURCE_TOPIC);
+ // simple stream performance source->store
+ processStreamWithStateStore(SOURCE_TOPIC);
+ // simple stream performance source->cache->store
+ processStreamWithCachedStateStore(SOURCE_TOPIC);
+ // simple aggregation
+ count(COUNT_TOPIC);
+ // simple streams performance KSTREAM-KTABLE join
+ kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+ // simple streams performance KSTREAM-KSTREAM join
+ kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+ // simple streams performance KTABLE-KTABLE join
+ kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+ break;
+ case "produce":
+ produce(SOURCE_TOPIC);
+ break;
+ case "consume":
+ consume(SOURCE_TOPIC);
+ break;
+ case "count":
+ count(COUNT_TOPIC);
+ break;
+ case "processstream":
+ processStream(SOURCE_TOPIC);
+ break;
+ case "processstreamwithsink":
+ processStreamWithSink(SOURCE_TOPIC);
+ break;
+ case "processstreamwithstatestore":
+ processStreamWithStateStore(SOURCE_TOPIC);
+ break;
+ case "processstreamwithcachedstatestore":
+ processStreamWithCachedStateStore(SOURCE_TOPIC);
+ break;
+ case "kstreamktablejoin":
+ kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
+ break;
+ case "kstreamkstreamjoin":
+ kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
+ break;
+ case "ktablektablejoin":
+ kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
+ break;
+ default:
+ throw new Exception("Unknown test name " + testName);
+
+ }
}
public static void main(String[] args) throws Exception {
String kafka = args.length > 0 ? args[0] : "localhost:9092";
String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
- endKey = numRecords - 1;
+ boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
+ String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
@@ -113,60 +197,117 @@ public class SimpleBenchmark {
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("numRecords=" + numRecords);
+ System.out.println("loadPhase=" + loadPhase);
+ System.out.println("testName=" + testName);
+
+ SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName);
+ benchmark.run();
+ }
- SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka);
-
- // producer performance
- benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
- // consumer performance
- benchmark.consume(SOURCE_TOPIC);
- // simple stream performance source->process
- benchmark.processStream(SOURCE_TOPIC);
- // simple stream performance source->sink
- benchmark.processStreamWithSink(SOURCE_TOPIC);
- // simple stream performance source->store
- benchmark.processStreamWithStateStore(SOURCE_TOPIC);
- // simple stream performance source->cache->store
- benchmark.processStreamWithCachedStateStore(SOURCE_TOPIC);
- // simple streams performance KSTREAM-KTABLE join
- benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
- // simple streams performance KSTREAM-KSTREAM join
- benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
- // simple streams performance KTABLE-KTABLE join
- benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
- }
-
- private Properties setJoinProperties(final String applicationId) {
+ private Properties setStreamProperties(final String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ // the socket buffer needs to be large, especially when running in AWS with
+ // high latency. if running locally the default is fine.
+ props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+ props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
return props;
}
+ private Properties setProduceConsumeProperties(final String clientId) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ // the socket buffer needs to be large, especially when running in AWS with
+ // high latency. if running locally the default is fine.
+ props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ // the socket buffer needs to be large, especially when running in AWS with
+ // high latency. if running locally the default is fine.
+ props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
+ return props;
+ }
+
+ private boolean maybeSetupPhase(final String topic, final String clientId,
+ final boolean skipIfAllTests) throws Exception {
+ processedRecords = 0;
+ processedBytes = 0;
+ // initialize topics
+ if (loadPhase) {
+ if (skipIfAllTests) {
+ // if we run all tests, the produce test will have already loaded the data
+ if (testName.equals(ALL_TESTS)) {
+ // Skipping loading phase since previously loaded
+ return true;
+ }
+ }
+ System.out.println("Initializing topic " + topic);
+ // WARNING: The keys must be sequential, i.e., unique, otherwise the logic for when this test
+ // stops will not work (in createCountStreams)
+ produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, false);
+ return true;
+ }
+ return false;
+ }
+
+ private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<Integer, byte[]> input = builder.stream(topic);
+
+ input.groupByKey()
+ .count("tmpStoreName").foreach(new CountDownAction(latch));
+
+ return new KafkaStreams(builder, streamConfig);
+ }
+
+ /**
+ * Measure the performance of a simple aggregate like count.
+ * Counts the occurrence of numbers (note that normally people count words, this
+ * example counts numbers)
+ * @param countTopic Topic where numbers are stored
+ * @throws Exception
+ */
+ public void count(String countTopic) throws Exception {
+ if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
+ return;
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Properties props = setStreamProperties("simple-benchmark-count");
+ final KafkaStreams streams = createCountStreams(props, countTopic, latch);
+ runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
+ }
+
/**
* Measure the performance of a KStream-KTable left join. The setup is such that each
* KStream record joins to exactly one element in the KTable
*/
public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
- CountDownLatch latch = new CountDownLatch(numRecords);
+ if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
+ maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
+ return;
+ }
- // initialize topics
- System.out.println("Initializing kStreamTopic " + kStreamTopic);
- produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false);
- System.out.println("Initializing kTableTopic " + kTableTopic);
- produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false);
+ CountDownLatch latch = new CountDownLatch(1);
// setup join
- Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join");
+ Properties props = setStreamProperties("simple-benchmark-kstream-ktable-join");
final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
// run benchmark
- runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch);
+ runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
@@ -174,20 +315,19 @@ public class SimpleBenchmark {
* KStream record joins to exactly one element in the other KStream
*/
public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
- CountDownLatch latch = new CountDownLatch(numRecords);
+ if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
+ maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
+ return;
+ }
- // initialize topics
- System.out.println("Initializing kStreamTopic " + kStreamTopic1);
- produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false);
- System.out.println("Initializing kStreamTopic " + kStreamTopic2);
- produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false);
+ CountDownLatch latch = new CountDownLatch(1);
// setup join
- Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join");
+ Properties props = setStreamProperties("simple-benchmark-kstream-kstream-join");
final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
// run benchmark
- runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch);
+ runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
@@ -195,23 +335,29 @@ public class SimpleBenchmark {
* KTable record joins to exactly one element in the other KTable
*/
public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
- CountDownLatch latch = new CountDownLatch(numRecords);
-
- // initialize topics
- System.out.println("Initializing kTableTopic " + kTableTopic1);
- produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false);
- System.out.println("Initializing kTableTopic " + kTableTopic2);
- produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false);
+ if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
+ maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
+ return;
+ }
+ CountDownLatch latch = new CountDownLatch(1);
// setup join
- Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join");
+ Properties props = setStreamProperties("simple-benchmark-ktable-ktable-join");
final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
// run benchmark
- runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch);
+ runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
- private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
+ private void printResults(final String nameOfBenchmark, final long latency) {
+ System.out.println(nameOfBenchmark +
+ processedRecords + "/" +
+ latency + "/" +
+ recordsPerSec(latency, processedRecords) + "/" +
+ megabytesPerSec(latency, processedBytes));
+ }
+
+ private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
streams.start();
long startTime = System.currentTimeMillis();
@@ -224,20 +370,12 @@ public class SimpleBenchmark {
}
}
long endTime = System.currentTimeMillis();
-
-
- System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE));
+ printResults(nameOfBenchmark, endTime - startTime);
streams.close();
}
-
-
- public void processStream(String topic) {
- CountDownLatch latch = new CountDownLatch(1);
-
- final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch);
-
+ private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception {
Thread thread = new Thread() {
public void run() {
streams.start();
@@ -257,96 +395,72 @@ public class SimpleBenchmark {
long endTime = System.currentTimeMillis();
- System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
-
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
+
+ return endTime - startTime;
}
- public void processStreamWithSink(String topic) {
- CountDownLatch latch = new CountDownLatch(1);
+ public void processStream(final String topic) throws Exception {
+ if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
+ return;
+ }
- final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch);
+ CountDownLatch latch = new CountDownLatch(1);
- Thread thread = new Thread() {
- public void run() {
- streams.start();
- }
- };
- thread.start();
+ final KafkaStreams streams = createKafkaStreams(topic, latch);
+ long latency = startStreamsThread(streams, latch);
- long startTime = System.currentTimeMillis();
+ printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
+ }
- while (latch.getCount() > 0) {
- try {
- latch.await();
- } catch (InterruptedException ex) {
- Thread.interrupted();
- }
+ public void processStreamWithSink(String topic) throws Exception {
+ if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
+ return;
}
- long endTime = System.currentTimeMillis();
+ CountDownLatch latch = new CountDownLatch(1);
+ final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch);
+ long latency = startStreamsThread(streams, latch);
- System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime));
+ printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency);
- streams.close();
- try {
- thread.join();
- } catch (Exception ex) {
- // ignore
- }
}
- private void internalProcessStreamWithStore(final KafkaStreams streams, final CountDownLatch latch,
- final String message) {
- Thread thread = new Thread() {
- public void run() {
- streams.start();
- }
- };
- thread.start();
-
- long startTime = System.currentTimeMillis();
-
- while (latch.getCount() > 0) {
- try {
- latch.await();
- } catch (InterruptedException ex) {
- Thread.interrupted();
- }
+ public void processStreamWithStateStore(String topic) throws Exception {
+ if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
+ return;
}
- long endTime = System.currentTimeMillis();
-
- System.out.println(message + megaBytePerSec(endTime - startTime));
-
- streams.close();
- try {
- thread.join();
- } catch (Exception ex) {
- // ignore
- }
- }
- public void processStreamWithStateStore(String topic) {
CountDownLatch latch = new CountDownLatch(1);
-
- final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, false);
- internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]: ");
+ final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, false);
+ long latency = startStreamsThread(streams, latch);
+ printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency);
}
- public void processStreamWithCachedStateStore(String topic) {
- CountDownLatch latch = new CountDownLatch(1);
-
- final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, true);
+ public void processStreamWithCachedStateStore(String topic) throws Exception {
+ if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
+ return;
+ }
- internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]: ");
+ CountDownLatch latch = new CountDownLatch(1);
+ final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, true);
+ long latency = startStreamsThread(streams, latch);
+ printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
}
+ public void produce(String topic) throws Exception {
+ // loading phase does not make sense for producer
+ if (loadPhase) {
+ return;
+ }
+ produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
+ }
/**
* Produce values to a topic
* @param topic Topic to produce to
@@ -358,22 +472,28 @@ public class SimpleBenchmark {
* @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used
* when this produce step is part of another benchmark that produces its own stats
*/
- public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
- int upperRange, boolean printStats) throws Exception {
+ private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
+ int upperRange, boolean printStats) throws Exception {
+ processedRecords = 0;
+ processedBytes = 0;
if (sequential) {
if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
}
- Properties props = new Properties();
- props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ if (!sequential) {
+ System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
+ }
+ Properties props = setProduceConsumeProperties(clientId);
+
int key = 0;
Random rand = new Random();
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
byte[] value = new byte[valueSizeBytes];
+ // put some random values to increase entropy. Some devices
+ // like SSDs do compression and if the array is all zeros
+ // the performance will be too good.
+ new Random().nextBytes(value);
long startTime = System.currentTimeMillis();
if (sequential) key = 0;
@@ -382,22 +502,24 @@ public class SimpleBenchmark {
producer.send(new ProducerRecord<>(topic, key, value));
if (sequential) key++;
else key = rand.nextInt(upperRange);
+ processedRecords++;
+ processedBytes += value.length + Integer.SIZE;
}
producer.close();
long endTime = System.currentTimeMillis();
- if (printStats)
- System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes));
+ if (printStats) {
+ printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
+ }
}
- public void consume(String topic) {
- Properties props = new Properties();
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ public void consume(String topic) throws Exception {
+ if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
+ return;
+ }
+
+ Properties props = setProduceConsumeProperties("simple-benchmark-consumer");
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
@@ -410,33 +532,33 @@ public class SimpleBenchmark {
long startTime = System.currentTimeMillis();
while (true) {
- ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
+ ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
if (records.isEmpty()) {
- if (endKey.equals(key))
+ if (processedRecords == numRecords)
break;
} else {
for (ConsumerRecord<Integer, byte[]> record : records) {
+ processedRecords++;
+ processedBytes += record.value().length + Integer.SIZE;
Integer recKey = record.key();
-
if (key == null || key < recKey)
key = recKey;
+ if (processedRecords == numRecords)
+ break;
}
}
+ if (processedRecords == numRecords)
+ break;
}
long endTime = System.currentTimeMillis();
consumer.close();
- System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
+ printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
}
- private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, final CountDownLatch latch) {
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
- props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
+ Properties props = setStreamProperties("simple-benchmark-streams");
KStreamBuilder builder = new KStreamBuilder();
@@ -453,7 +575,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
- if (endKey.equals(key)) {
+ processedRecords++;
+ processedBytes += value.length + Integer.SIZE;
+ if (processedRecords == numRecords) {
latch.countDown();
}
}
@@ -469,16 +593,11 @@ public class SimpleBenchmark {
}
});
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
- private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, final CountDownLatch latch) {
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
- props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
+ final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
KStreamBuilder builder = new KStreamBuilder();
@@ -495,7 +614,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
- if (endKey.equals(key)) {
+ processedRecords++;
+ processedBytes += value.length + Integer.SIZE;
+ if (processedRecords == numRecords) {
latch.countDown();
}
}
@@ -511,17 +632,27 @@ public class SimpleBenchmark {
}
});
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
- private class CountDownAction<K, V> implements ForeachAction<K, V> {
+ private class CountDownAction<V> implements ForeachAction<Integer, V> {
private CountDownLatch latch;
CountDownAction(final CountDownLatch latch) {
this.latch = latch;
}
@Override
- public void apply(K key, V value) {
- this.latch.countDown();
+ public void apply(Integer key, V value) {
+ processedRecords++;
+ if (value instanceof byte[]) {
+ processedBytes += ((byte[]) value).length + Integer.SIZE;
+ } else if (value instanceof Long) {
+ processedBytes += Long.SIZE + Integer.SIZE;
+ } else {
+ System.err.println("Unknown value type in CountDownAction");
+ }
+ if (processedRecords == numRecords) {
+ this.latch.countDown();
+ }
}
}
@@ -534,7 +665,7 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
@@ -546,7 +677,7 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
@@ -559,18 +690,13 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
- private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka,
+ private KafkaStreams createKafkaStreamsWithStateStore(String topic,
final CountDownLatch latch,
boolean enableCaching) {
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store" + enableCaching);
- props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
KStreamBuilder builder = new KStreamBuilder();
@@ -596,8 +722,9 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
store.put(key, value);
-
- if (endKey.equals(key)) {
+ processedRecords++;
+ processedBytes += value.length + Integer.SIZE;
+ if (processedRecords == numRecords) {
latch.countDown();
}
}
@@ -613,15 +740,29 @@ public class SimpleBenchmark {
}
}, "store");
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
- private double megaBytePerSec(long time) {
- return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000);
+ private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
+ final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+ streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+ streamsClient.close(30, TimeUnit.SECONDS);
+ }
+ });
+
+ return streamsClient;
+ }
+
+ private double megabytesPerSec(long time, long processedBytes) {
+ return (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
}
- private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
- return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000);
+ private double recordsPerSec(long time, int numRecords) {
+ return numRecords / (time / 1000.0);
}
private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index ebd69a6..a56def3 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -13,41 +13,86 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
-
+from ducktape.mark import parametrize, matrix
from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
+from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.version import DEV_BRANCH
-class StreamsSimpleBenchmarkTest(KafkaTest):
+class StreamsSimpleBenchmarkTest(Test):
"""
Simple benchmark of Kafka Streams.
"""
def __init__(self, test_context):
- super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={
- 'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 },
- 'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 },
- 'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }
- })
+ super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
+ self.num_records = 1000000L
+ self.replication = 1
- self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
- @cluster(num_nodes=3)
- def test_simple_benchmark(self):
+ @cluster(num_nodes=9)
+ @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink", "processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin", "ktablektablejoin"], scale=[1, 3])
+ def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
"""
+ self.driver = [None] * (scale + 1)
+ node = [None] * (scale)
+ data = [None] * (scale)
+
+ #############
+ # SETUP PHASE
+ #############
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.zk.start()
+ self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
+ 'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'countTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
+ 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }
+ })
+ self.kafka.start()
+
+ ################
+ # LOAD PHASE
+ ################
+ self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
+ self.num_records * scale, "true", test)
+ self.load_driver.start()
+ self.load_driver.wait()
+ self.load_driver.stop()
+
+ ################
+ # RUN PHASE
+ ################
+ for num in range(0, scale):
+ self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
+ self.num_records/(scale), "false", test)
+ self.driver[num].start()
- self.driver.start()
- self.driver.wait()
- self.driver.stop()
- node = self.driver.node
- node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False)
+ #######################
+ # STOP + COLLECT PHASE
+ #######################
+ for num in range(0, scale):
+ self.driver[num].wait()
+ self.driver[num].stop()
+ node[num] = self.driver[num].node
+ node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
+ data[num] = self.driver[num].collect_data(node[num], "" )
+
- return self.driver.collect_data(node)
+ final = {}
+ for num in range(0, scale):
+ for key in data[num]:
+ final[key + str(num)] = data[num][key]
+
+ return final
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/services/performance/streams_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
index 4ccc3b2..8cedb51 100644
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ b/tests/kafkatest/services/performance/streams_performance.py
@@ -22,17 +22,19 @@ from kafkatest.services.streams import StreamsTestBaseService
class StreamsSimpleBenchmarkService(StreamsTestBaseService):
"""Base class for simple Kafka Streams benchmark"""
- def __init__(self, test_context, kafka, numrecs):
+ def __init__(self, test_context, kafka, numrecs, load_phase, test_name):
super(StreamsSimpleBenchmarkService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.perf.SimpleBenchmark",
- numrecs)
+ numrecs,
+ load_phase,
+ test_name)
- def collect_data(self, node):
+ def collect_data(self, node, tag = None):
# Collect the data and return it to the framework
output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
data = {}
for line in output:
parts = line.split(':')
- data[parts[0]] = float(parts[1])
+ data[tag + parts[0]] = parts[1]
return data
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ba26ef1/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 8bc4d9e..e7be947 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -45,11 +45,13 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"collect_default": True},
}
- def __init__(self, test_context, kafka, streams_class_name, user_test_args):
+ def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
super(StreamsTestBaseService, self).__init__(test_context, 1)
self.kafka = kafka
self.args = {'streams_class_name': streams_class_name,
- 'user_test_args': user_test_args}
+ 'user_test_args': user_test_args,
+ 'user_test_args1': user_test_args1,
+ 'user_test_args2': user_test_args2}
self.log_level = "DEBUG"
@property
@@ -75,7 +77,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
- wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
+ wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Test process on " + str(node.account) + " took too long to exit")
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
@@ -119,7 +121,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
- " %(kafka)s %(state_dir)s %(user_test_args)s" \
+ " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd