You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/05 20:28:07 UTC
[kafka] branch 2.5 updated: MINOR: remove stream simple benchmark
suite (#8353)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 25a1ed4 MINOR: remove stream simple benchmark suite (#8353)
25a1ed4 is described below
commit 25a1ed4cb5975f7531e3c65e5f5d8ab77f5f3a4b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Tue Apr 14 09:49:03 2020 -0700
MINOR: remove stream simple benchmark suite (#8353)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/perf/SimpleBenchmark.java | 752 ---------------------
.../apache/kafka/streams/perf/YahooBenchmark.java | 306 ---------
tests/kafkatest/benchmarks/streams/__init__.py | 14 -
.../streams/streams_simple_benchmark_test.py | 164 -----
.../services/performance/streams_performance.py | 108 ---
5 files changed, 1344 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
deleted file mode 100644
index 4a14b87..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ /dev/null
@@ -1,752 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.perf;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.WindowStore;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static java.time.Duration.ofMillis;
-import static java.time.Duration.ofSeconds;
-import static java.time.Instant.ofEpochMilli;
-
-/**
- * Class that provides support for a series of benchmarks. It is usually driven by
- * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
- * If ran manually through the main() function below, you must do the following:
- * 1. Have ZK and a Kafka broker set up
- * 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
- * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
- * Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
- * that this is a real run.
- *
- * Note that "all" is a convenience option when running this test locally and will not work when running the test
- * at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py). That is due to exact syncronization
- * needs for each test (e.g., you wouldn't want one instance to run "count" while another
- * is still running "consume"
- */
-public class SimpleBenchmark {
- private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
-
- private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
- private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
- private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
-
- private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
- private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
-
- private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
- @Override
- public byte[] apply(final byte[] value1, final byte[] value2) {
- // dump joiner in order to have as less join overhead as possible
- if (value1 != null) {
- return value1;
- } else if (value2 != null) {
- return value2;
- } else {
- return new byte[100];
- }
- }
- };
-
- private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
- private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
-
- long processedBytes = 0L;
- int processedRecords = 0;
-
- private static final long POLL_MS = 500L;
- private static final long COMMIT_INTERVAL_MS = 30000L;
- private static final int MAX_POLL_RECORDS = 1000;
-
- /* ----------- benchmark variables that are hard-coded ----------- */
-
- private static final int KEY_SPACE_SIZE = 10000;
-
- private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
-
- private static final long AGGREGATE_WINDOW_SIZE = 1000L;
-
- private static final long AGGREGATE_WINDOW_ADVANCE = 500L;
-
- private static final int SOCKET_SIZE_BYTES = 1024 * 1024;
-
- // the following numbers are based on empirical results and should only
- // be considered for updates when perf results have significantly changed
-
- // with at least 10 million records, we run for at most 3 minutes
- private static final int MAX_WAIT_MS = 3 * 60 * 1000;
-
- /* ----------- benchmark variables that can be specified ----------- */
-
- final String testName;
-
- final int numRecords;
-
- final Properties props;
-
- private final int valueSize;
-
- private final double keySkew;
-
- /* ----------- ----------------------------------------- ----------- */
-
-
- private SimpleBenchmark(final Properties props,
- final String testName,
- final int numRecords,
- final double keySkew,
- final int valueSize) {
- super();
- this.props = props;
- this.testName = testName;
- this.keySkew = keySkew;
- this.valueSize = valueSize;
- this.numRecords = numRecords;
- }
-
- private void run() {
- switch (testName) {
- // loading phases
- case "load-one":
- produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
- break;
- case "load-two":
- produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
- produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, numRecords, keySkew, valueSize);
- break;
-
- // testing phases
- case "consume":
- consume(SOURCE_TOPIC_ONE);
- break;
- case "consumeproduce":
- consumeAndProduce(SOURCE_TOPIC_ONE);
- break;
- case "streamcount":
- countStreamsNonWindowed(SOURCE_TOPIC_ONE);
- break;
- case "streamcountwindowed":
- countStreamsWindowed(SOURCE_TOPIC_ONE);
- break;
- case "streamprocess":
- processStream(SOURCE_TOPIC_ONE);
- break;
- case "streamprocesswithsink":
- processStreamWithSink(SOURCE_TOPIC_ONE);
- break;
- case "streamprocesswithstatestore":
- processStreamWithStateStore(SOURCE_TOPIC_ONE);
- break;
- case "streamprocesswithwindowstore":
- processStreamWithWindowStore(SOURCE_TOPIC_ONE);
- break;
- case "streamtablejoin":
- streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
- break;
- case "streamstreamjoin":
- streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
- break;
- case "tabletablejoin":
- tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
- break;
- case "yahoo":
- yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
- break;
- default:
- throw new RuntimeException("Unknown test name " + testName);
-
- }
- }
-
- public static void main(final String[] args) throws IOException {
- if (args.length < 5) {
- System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
- System.exit(1);
- }
-
- final String propFileName = args[0];
- final String testName = args[1].toLowerCase(Locale.ROOT);
- final int numRecords = Integer.parseInt(args[2]);
- final double keySkew = Double.parseDouble(args[3]); // 0d means even distribution
- final int valueSize = Integer.parseInt(args[4]);
-
- final Properties props = Utils.loadProps(propFileName);
- final String kafka = props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-
- if (kafka == null) {
- System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
- System.exit(1);
- }
-
- // Note: this output is needed for automated tests and must not be removed
- System.out.println("StreamsTest instance started");
-
- System.out.println("testName=" + testName);
- System.out.println("streamsProperties=" + props);
- System.out.println("numRecords=" + numRecords);
- System.out.println("keySkew=" + keySkew);
- System.out.println("valueSize=" + valueSize);
-
- final SimpleBenchmark benchmark = new SimpleBenchmark(props, testName, numRecords, keySkew, valueSize);
-
- benchmark.run();
- }
-
- public void setStreamProperties(final String applicationId) {
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
- props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
- props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
- // the socket buffer needs to be large, especially when running in AWS with
- // high latency. if running locally the default is fine.
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
-
- // improve producer throughput
- props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
- }
-
- private Properties setProduceConsumeProperties(final String clientId) {
- final Properties clientProps = new Properties();
- clientProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
- clientProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- // the socket buffer needs to be large, especially when running in AWS with
- // high latency. if running locally the default is fine.
- clientProps.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
- clientProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
- clientProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
- clientProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- clientProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
- clientProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- clientProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- clientProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- // the socket buffer needs to be large, especially when running in AWS with
- // high latency. if running locally the default is fine.
- clientProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
- clientProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
- return clientProps;
- }
-
- void resetStats() {
- processedRecords = 0;
- processedBytes = 0L;
- }
-
- /**
- * Produce values to a topic
- * @param clientId String specifying client ID
- * @param topic Topic to produce to
- * @param numRecords Number of records to produce
- * @param keySkew Key zipf distribution skewness
- * @param valueSize Size of value in bytes
- */
- private void produce(final String clientId,
- final String topic,
- final int numRecords,
- final double keySkew,
- final int valueSize) {
- final Properties props = setProduceConsumeProperties(clientId);
- final ZipfGenerator keyGen = new ZipfGenerator(KEY_SPACE_SIZE, keySkew);
-
- try (final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props)) {
- final byte[] value = new byte[valueSize];
- // put some random values to increase entropy. Some devices
- // like SSDs do compression and if the array is all zeros
- // the performance will be too good.
- new Random(System.currentTimeMillis()).nextBytes(value);
-
- for (int i = 0; i < numRecords; i++) {
- producer.send(new ProducerRecord<>(topic, keyGen.next(), value));
- }
- }
- }
-
- private void consumeAndProduce(final String topic) {
- final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
- final Properties producerProps = setProduceConsumeProperties("simple-benchmark-producer");
-
- final long startTime = System.currentTimeMillis();
- try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
- final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
- final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
-
- consumer.assign(partitions);
- consumer.seekToBeginning(partitions);
-
- while (true) {
- final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
- if (records.isEmpty()) {
- if (processedRecords == numRecords) {
- break;
- }
- } else {
- for (final ConsumerRecord<Integer, byte[]> record : records) {
- producer.send(new ProducerRecord<>(SINK_TOPIC, record.key(), record.value()));
- processedRecords++;
- processedBytes += record.value().length + Integer.SIZE;
- if (processedRecords == numRecords) {
- break;
- }
- }
- }
- if (processedRecords == numRecords) {
- break;
- }
- }
- }
-
- final long endTime = System.currentTimeMillis();
-
- printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
- }
-
- private void consume(final String topic) {
- final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
-
- final long startTime = System.currentTimeMillis();
-
- try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
- final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
-
- consumer.assign(partitions);
- consumer.seekToBeginning(partitions);
-
- while (true) {
- final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
- if (records.isEmpty()) {
- if (processedRecords == numRecords) {
- break;
- }
- } else {
- for (final ConsumerRecord<Integer, byte[]> record : records) {
- processedRecords++;
- processedBytes += record.value().length + Integer.SIZE;
- if (processedRecords == numRecords) {
- break;
- }
- }
- }
- if (processedRecords == numRecords) {
- break;
- }
- }
- }
-
- final long endTime = System.currentTimeMillis();
-
- printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
- }
-
- private void processStream(final String topic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-streams-source");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek(new CountDownAction(latch));
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- private void processStreamWithSink(final String topic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-streams-source-sink");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KStream<Integer, byte[]> source = builder.stream(topic);
- source.peek(new CountDownAction(latch)).to(SINK_TOPIC);
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- private void processStreamWithStateStore(final String topic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-streams-with-store");
-
- final StreamsBuilder builder = new StreamsBuilder();
- final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder =
- Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
- builder.addStateStore(storeBuilder.withCachingEnabled());
-
- final KStream<Integer, byte[]> source = builder.stream(topic);
-
- source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
- @Override
- public Processor<Integer, byte[]> get() {
- return new AbstractProcessor<Integer, byte[]>() {
- KeyValueStore<Integer, byte[]> store;
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
- }
-
- @Override
- public void process(final Integer key, final byte[] value) {
- store.get(key);
- store.put(key, value);
- }
- };
- }
- }, "store");
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- private void processStreamWithWindowStore(final String topic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-streams-with-store");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
- "store",
- ofMillis(AGGREGATE_WINDOW_SIZE * 3),
- ofMillis(AGGREGATE_WINDOW_SIZE),
- false
- ),
- INTEGER_SERDE,
- BYTE_SERDE
- );
- builder.addStateStore(storeBuilder.withCachingEnabled());
-
- final KStream<Integer, byte[]> source = builder.stream(topic);
-
- source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
- @Override
- public Processor<Integer, byte[]> get() {
- return new AbstractProcessor<Integer, byte[]>() {
- WindowStore<Integer, byte[]> store;
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- store = (WindowStore<Integer, byte[]>) context.getStateStore("store");
- }
-
- @Override
- public void process(final Integer key, final byte[] value) {
- final long timestamp = context().timestamp();
- final KeyValueIterator<Windowed<Integer>, byte[]> iter = store.fetch(key - 10, key + 10, ofEpochMilli(timestamp - 1000L), ofEpochMilli(timestamp));
- while (iter.hasNext()) {
- iter.next();
- }
- iter.close();
-
- store.put(key, value, timestamp);
- }
- };
- }
- }, "store");
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- /**
- * Measure the performance of a simple aggregate like count.
- * Counts the occurrence of numbers (note that normally people count words, this
- * example counts numbers)
- */
- private void countStreamsNonWindowed(final String sourceTopic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-nonwindowed-count");
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
-
- input.peek(new CountDownAction(latch))
- .groupByKey()
- .count();
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
- }
-
- /**
- * Measure the performance of a simple aggregate like count.
- * Counts the occurrence of numbers (note that normally people count words, this
- * example counts numbers)
- */
- private void countStreamsWindowed(final String sourceTopic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-windowed-count");
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
-
- input.peek(new CountDownAction(latch))
- .groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(AGGREGATE_WINDOW_SIZE)).advanceBy(ofMillis(AGGREGATE_WINDOW_ADVANCE)))
- .count();
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
- runGenericBenchmark(streams, "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
- }
-
- /**
- * Measure the performance of a KStream-KTable left join. The setup is such that each
- * KStream record joins to exactly one element in the KTable
- */
- private void streamTableJoin(final String kStreamTopic, final String kTableTopic) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-stream-table-join");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic);
- final KTable<Integer, byte[]> input2 = builder.table(kTableTopic);
-
- input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
- // run benchmark
- runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- /**
- * Measure the performance of a KStream-KStream left join. The setup is such that each
- * KStream record joins to exactly one element in the other KStream
- */
- private void streamStreamJoin(final String kStreamTopic1, final String kStreamTopic2) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- setStreamProperties("simple-benchmark-stream-stream-join");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic1);
- final KStream<Integer, byte[]> input2 = builder.stream(kStreamTopic2);
-
- input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(ofMillis(STREAM_STREAM_JOIN_WINDOW))).foreach(new CountDownAction(latch));
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
- // run benchmark
- runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- /**
- * Measure the performance of a KTable-KTable left join. The setup is such that each
- * KTable record joins to exactly one element in the other KTable
- */
- private void tableTableJoin(final String kTableTopic1, final String kTableTopic2) {
- final CountDownLatch latch = new CountDownLatch(1);
-
- // setup join
- setStreamProperties("simple-benchmark-table-table-join");
-
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KTable<Integer, byte[]> input1 = builder.table(kTableTopic1);
- final KTable<Integer, byte[]> input2 = builder.table(kTableTopic2);
-
- input1.leftJoin(input2, VALUE_JOINER).toStream().foreach(new CountDownAction(latch));
-
- final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
-
- // run benchmark
- runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
- }
-
- void printResults(final String nameOfBenchmark, final long latency) {
- System.out.println(nameOfBenchmark +
- processedRecords + "/" +
- latency + "/" +
- recordsPerSec(latency, processedRecords) + "/" +
- megabytesPerSec(latency, processedBytes));
- }
-
- void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
- streams.start();
-
- final long startTime = System.currentTimeMillis();
- long endTime = startTime;
-
- while (latch.getCount() > 0 && (endTime - startTime < MAX_WAIT_MS)) {
- try {
- latch.await(1000, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException ex) {
- Thread.interrupted();
- }
-
- endTime = System.currentTimeMillis();
- }
- streams.close();
-
- printResults(nameOfBenchmark, endTime - startTime);
- }
-
- private class CountDownAction implements ForeachAction<Integer, byte[]> {
- private final CountDownLatch latch;
-
- CountDownAction(final CountDownLatch latch) {
- this.latch = latch;
- }
-
- @Override
- public void apply(final Integer key, final byte[] value) {
- processedRecords++;
- processedBytes += Integer.SIZE + value.length;
-
- if (processedRecords == numRecords) {
- this.latch.countDown();
- }
- }
- }
-
- private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
- final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
- streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
-
- streamsClient.close(ofSeconds(30));
- }
- });
-
- return streamsClient;
- }
-
- private double megabytesPerSec(final long time, final long processedBytes) {
- return (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
- }
-
- private double recordsPerSec(final long time, final int numRecords) {
- return numRecords / (time / 1000.0);
- }
-
- private List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
- final ArrayList<TopicPartition> partitions = new ArrayList<>();
-
- for (final String topic : topics) {
- for (final PartitionInfo info : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(info.topic(), info.partition()));
- }
- }
- return partitions;
- }
-
- private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
- final YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
-
- benchmark.run();
- }
-
- private class ZipfGenerator {
- final private Random rand = new Random(System.currentTimeMillis());
- final private int size;
- final private double skew;
-
- private double bottom = 0.0d;
-
- ZipfGenerator(final int size, final double skew) {
- this.size = size;
- this.skew = skew;
-
- for (int i = 1; i < size; i++) {
- this.bottom += 1.0d / Math.pow(i, this.skew);
- }
- }
-
- int next() {
- if (skew == 0.0d) {
- return rand.nextInt(size);
- } else {
- int rank;
- double dice;
- double frequency;
-
- rank = rand.nextInt(size);
- frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
- dice = rand.nextDouble();
-
- while (!(dice < frequency)) {
- rank = rand.nextInt(size);
- frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
- dice = rand.nextDouble();
- }
-
- return rank;
- }
- }
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
deleted file mode 100644
index 2cab626..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.perf;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
-import org.apache.kafka.streams.kstream.Joined;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.TimeWindows;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-
-/**
- * A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark
- * https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
- * Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog:
- * https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html
- */
-public class YahooBenchmark {
- private final SimpleBenchmark parent;
- private final String campaignsTopic;
- private final String eventsTopic;
-
- static class ProjectedEvent {
- /* attributes need to be public for serializer to work */
- /* main attributes */
- String eventType;
- String adID;
-
- /* other attributes */
- long eventTime;
- /* not used
- public String userID = UUID.randomUUID().toString();
- public String pageID = UUID.randomUUID().toString();
- public String addType = "banner78";
- public String ipAddress = "1.2.3.4";
- */
- }
-
- static class CampaignAd {
- /* attributes need to be public for serializer to work */
- String adID;
- String campaignID;
- }
-
- @SuppressWarnings("WeakerAccess")
- public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
- this.parent = parent;
- this.campaignsTopic = campaignsTopic;
- this.eventsTopic = eventsTopic;
- }
-
- // just for Yahoo benchmark
- private boolean maybeSetupPhaseCampaigns(final String topic,
- final String clientId,
- final boolean skipIfAllTests,
- final int numCampaigns,
- final int adsPerCampaign,
- final List<String> ads) {
- parent.resetStats();
- // initialize topics
- System.out.println("Initializing topic " + topic);
-
- final Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
- props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- try (final KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
- for (int c = 0; c < numCampaigns; c++) {
- final String campaignID = UUID.randomUUID().toString();
- for (int a = 0; a < adsPerCampaign; a++) {
- final String adId = UUID.randomUUID().toString();
- final String concat = adId + ":" + campaignID;
- producer.send(new ProducerRecord<>(topic, adId, concat));
- ads.add(adId);
- parent.processedRecords++;
- parent.processedBytes += concat.length() + adId.length();
- }
- }
- }
- return true;
- }
-
- // just for Yahoo benchmark
- private void maybeSetupPhaseEvents(final String topic,
- final String clientId,
- final int numRecords,
- final List<String> ads) {
- parent.resetStats();
- final String[] eventTypes = new String[]{"view", "click", "purchase"};
- final Random rand = new Random(System.currentTimeMillis());
- System.out.println("Initializing topic " + topic);
-
- final Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
- props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-
- final long startTime = System.currentTimeMillis();
-
- try (final KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props)) {
- final ProjectedEvent event = new ProjectedEvent();
- final Map<String, Object> serdeProps = new HashMap<>();
- final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
- projectedEventSerializer.configure(serdeProps, false);
-
- for (int i = 0; i < numRecords; i++) {
- event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
- event.adID = ads.get(rand.nextInt(ads.size() - 1));
- event.eventTime = System.currentTimeMillis();
- final byte[] value = projectedEventSerializer.serialize(topic, event);
- producer.send(new ProducerRecord<>(topic, event.adID, value));
- parent.processedRecords++;
- parent.processedBytes += value.length + event.adID.length();
- }
- }
-
- final long endTime = System.currentTimeMillis();
-
- parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
- }
-
-
- public void run() {
- final int numCampaigns = 100;
- final int adsPerCampaign = 10;
-
- final List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
- maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false, numCampaigns, adsPerCampaign, ads);
- maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", parent.numRecords, ads);
-
- final CountDownLatch latch = new CountDownLatch(1);
- parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
-
- final KafkaStreams streams = createYahooBenchmarkStreams(parent.props, campaignsTopic, eventsTopic, latch, parent.numRecords);
- parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
-
- }
- // Note: these are also in the streams example package, eventually use 1 file
- private class JsonPOJOSerializer<T> implements Serializer<T> {
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- /**
- * Default constructor needed by Kafka
- */
- @SuppressWarnings("WeakerAccess")
- public JsonPOJOSerializer() {}
-
- @Override
- public byte[] serialize(final String topic, final T data) {
- if (data == null) {
- return null;
- }
-
- try {
- return objectMapper.writeValueAsBytes(data);
- } catch (final Exception e) {
- throw new SerializationException("Error serializing JSON message", e);
- }
- }
- }
-
- // Note: these are also in the streams example package, eventuall use 1 file
- private class JsonPOJODeserializer<T> implements Deserializer<T> {
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- private Class<T> tClass;
-
- /**
- * Default constructor needed by Kafka
- */
- @SuppressWarnings("WeakerAccess")
- public JsonPOJODeserializer() {}
-
- @SuppressWarnings("unchecked")
- @Override
- public void configure(final Map<String, ?> props, final boolean isKey) {
- tClass = (Class<T>) props.get("JsonPOJOClass");
- }
-
- @Override
- public T deserialize(final String topic, final byte[] bytes) {
- if (bytes == null) {
- return null;
- }
-
- final T data;
- try {
- data = objectMapper.readValue(bytes, tClass);
- } catch (final Exception e) {
- throw new SerializationException(e);
- }
-
- return data;
- }
- }
-
- private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
- final CountDownLatch latch, final int numRecords) {
- final Map<String, Object> serdeProps = new HashMap<>();
- final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
- serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
- projectedEventSerializer.configure(serdeProps, false);
- final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>();
- serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
- projectedEventDeserializer.configure(serdeProps, false);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
- Consumed.with(Serdes.String(),
- Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
- final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
-
- final KStream<String, ProjectedEvent> filteredEvents = kEvents
- // use peek to quick when last element is processed
- .peek((key, value) -> {
- parent.processedRecords++;
- if (parent.processedRecords % 1000000 == 0) {
- System.out.println("Processed " + parent.processedRecords);
- }
- if (parent.processedRecords >= numRecords) {
- latch.countDown();
- }
- })
- // only keep "view" events
- .filter((key, value) -> value.eventType.equals("view"))
- // select just a few of the columns
- .mapValues(value -> {
- final ProjectedEvent event = new ProjectedEvent();
- event.adID = value.adID;
- event.eventTime = value.eventTime;
- event.eventType = value.eventType;
- return event;
- });
-
- // deserialize the add ID and campaign ID from the stored value in Kafka
- final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(value -> {
- final String[] parts = value.split(":");
- final CampaignAd cAdd = new CampaignAd();
- cAdd.adID = parts[0];
- cAdd.campaignID = parts[1];
- return cAdd;
- });
-
- // join the events with the campaigns
- final KStream<String, String> joined = filteredEvents.join(
- deserCampaigns,
- (value1, value2) -> value2.campaignID,
- Joined.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null)
- );
-
- // key by campaign rather than by ad as original
- final KStream<String, String> keyedByCampaign = joined
- .selectKey((key, value) -> value);
-
- // calculate windowed counts
- keyedByCampaign
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
- .count(Materialized.as("time-windows"));
-
- return new KafkaStreams(builder.build(), streamConfig);
- }
-}
diff --git a/tests/kafkatest/benchmarks/streams/__init__.py b/tests/kafkatest/benchmarks/streams/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/tests/kafkatest/benchmarks/streams/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
deleted file mode 100644
index 2f87f4a..0000000
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ /dev/null
@@ -1,164 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.tests.test import Test
-from ducktape.mark.resource import cluster
-from ducktape.mark import parametrize, matrix
-from kafkatest.tests.kafka_test import KafkaTest
-
-from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
-from kafkatest.version import DEV_BRANCH
-
-STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore"]
-STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed"]
-STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
-NON_STREAMS_TESTS = ["consume", "consumeproduce"]
-
-ALL_TEST = "all"
-STREAMS_SIMPLE_TEST = "streams-simple"
-STREAMS_COUNT_TEST = "streams-count"
-STREAMS_JOIN_TEST = "streams-join"
-
-
-class StreamsSimpleBenchmarkTest(Test):
- """
- Simple benchmark of Kafka Streams.
- """
-
- def __init__(self, test_context):
- super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
-
- # these values could be updated in ad-hoc benchmarks
- self.key_skew = 0
- self.value_size = 1024
- self.num_records = 10000000L
- self.num_threads = 1
-
- self.replication = 1
-
- @cluster(num_nodes=12)
- @matrix(test=["consume", "consumeproduce",
- "streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore",
- "streamcount", "streamcountwindowed",
- "streamtablejoin", "streamstreamjoin", "tabletablejoin"],
- scale=[1])
- def test_simple_benchmark(self, test, scale):
- """
- Run simple Kafka Streams benchmark
- """
- self.driver = [None] * (scale + 1)
-
- self.final = {}
-
- #############
- # SETUP PHASE
- #############
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
- self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
- 'simpleBenchmarkSourceTopic1' : { 'partitions': scale, 'replication-factor': self.replication },
- 'simpleBenchmarkSourceTopic2' : { 'partitions': scale, 'replication-factor': self.replication },
- 'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
- 'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
- 'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
- })
- self.kafka.log_level = "INFO"
- self.kafka.start()
-
-
- load_test = ""
- if test == ALL_TEST:
- load_test = "load-two"
- if test in STREAMS_JOIN_TESTS or test == STREAMS_JOIN_TEST:
- load_test = "load-two"
- if test in STREAMS_COUNT_TESTS or test == STREAMS_COUNT_TEST:
- load_test = "load-one"
- if test in STREAMS_SIMPLE_TESTS or test == STREAMS_SIMPLE_TEST:
- load_test = "load-one"
- if test in NON_STREAMS_TESTS:
- load_test = "load-one"
-
-
-
- ################
- # LOAD PHASE
- ################
- self.load_driver = StreamsSimpleBenchmarkService(self.test_context,
- self.kafka,
- load_test,
- self.num_threads,
- self.num_records,
- self.key_skew,
- self.value_size)
-
- self.load_driver.start()
- self.load_driver.wait(3600) # wait at most 30 minutes
- self.load_driver.stop()
-
- if test == ALL_TEST:
- for single_test in STREAMS_SIMPLE_TESTS + STREAMS_COUNT_TESTS + STREAMS_JOIN_TESTS:
- self.execute(single_test, scale)
- elif test == STREAMS_SIMPLE_TEST:
- for single_test in STREAMS_SIMPLE_TESTS:
- self.execute(single_test, scale)
- elif test == STREAMS_COUNT_TEST:
- for single_test in STREAMS_COUNT_TESTS:
- self.execute(single_test, scale)
- elif test == STREAMS_JOIN_TEST:
- for single_test in STREAMS_JOIN_TESTS:
- self.execute(single_test, scale)
- else:
- self.execute(test, scale)
-
- return self.final
-
- def execute(self, test, scale):
-
- ################
- # RUN PHASE
- ################
- for num in range(0, scale):
- self.driver[num] = StreamsSimpleBenchmarkService(self.test_context,
- self.kafka,
- test,
- self.num_threads,
- self.num_records,
- self.key_skew,
- self.value_size)
- self.driver[num].start()
-
- #######################
- # STOP + COLLECT PHASE
- #######################
- data = [None] * (scale)
-
- for num in range(0, scale):
- self.driver[num].wait()
- self.driver[num].stop()
- self.driver[num].node.account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
- data[num] = self.driver[num].collect_data(self.driver[num].node, "")
- self.driver[num].read_jmx_output_all_nodes()
-
- for num in range(0, scale):
- for key in data[num]:
- self.final[key + "-" + str(num)] = data[num][key]
-
- for key in sorted(self.driver[num].jmx_stats[0]):
- self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
-
- self.final[test + "-jmx-avg-" + str(num)] = self.driver[num].average_jmx_value
- self.final[test + "-jmx-max-" + str(num)] = self.driver[num].maximum_jmx_value
diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py
deleted file mode 100644
index 049c272..0000000
--- a/tests/kafkatest/services/performance/streams_performance.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.services.streams import StreamsTestBaseService
-from kafkatest.services.kafka import KafkaConfig
-from kafkatest.services import streams_property
-
-#
-# Class used to start the simple Kafka Streams benchmark
-#
-
-class StreamsSimpleBenchmarkService(StreamsTestBaseService):
- """Base class for simple Kafka Streams benchmark"""
-
- def __init__(self, test_context, kafka, test_name, num_threads, num_recs_or_wait_ms, key_skew, value_size):
- super(StreamsSimpleBenchmarkService, self).__init__(test_context,
- kafka,
- "org.apache.kafka.streams.perf.SimpleBenchmark",
- test_name,
- num_recs_or_wait_ms,
- key_skew,
- value_size)
-
- self.jmx_option = ""
- if test_name.startswith('stream') or test_name.startswith('table'):
- self.jmx_option = "stream-jmx"
- JmxMixin.__init__(self,
- num_nodes=1,
- jmx_object_names=['kafka.streams:type=stream-thread-metrics,thread-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
- jmx_attributes=['process-latency-avg',
- 'process-rate',
- 'commit-latency-avg',
- 'commit-rate',
- 'poll-latency-avg',
- 'poll-rate'],
- root=StreamsTestBaseService.PERSISTENT_ROOT)
-
- if test_name.startswith('consume'):
- self.jmx_option = "consumer-jmx"
- JmxMixin.__init__(self,
- num_nodes=1,
- jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=simple-benchmark-consumer'],
- jmx_attributes=['records-consumed-rate'],
- root=StreamsTestBaseService.PERSISTENT_ROOT)
-
- self.num_threads = num_threads
-
- def prop_file(self):
- cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
- streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
- streams_property.NUM_THREADS: self.num_threads})
- return cfg.render()
-
-
- def start_cmd(self, node):
- if self.jmx_option != "":
- args = self.args.copy()
- args['jmx_port'] = self.jmx_port
- args['config_file'] = self.CONFIG_FILE
- args['stdout'] = self.STDOUT_FILE
- args['stderr'] = self.STDERR_FILE
- args['pidfile'] = self.PID_FILE
- args['log4j'] = self.LOG4J_CONFIG_FILE
- args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
-
- cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
- "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
- " %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
- " %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
-
- else:
- cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
-
- return cmd
-
- def start_node(self, node):
- super(StreamsSimpleBenchmarkService, self).start_node(node)
-
- if self.jmx_option != "":
- self.start_jmx_tool(1, node)
-
- def clean_node(self, node):
- if self.jmx_option != "":
- JmxMixin.clean_node(self, node)
-
- super(StreamsSimpleBenchmarkService, self).clean_node(node)
-
- def collect_data(self, node, tag = None):
- # Collect the data and return it to the framework
- output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
- data = {}
- for line in output:
- parts = line.split(':')
- data[tag + parts[0]] = parts[1]
- return data