You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/19 23:14:36 UTC
[kafka] branch trunk updated: MINOR: add window store range query
in simple benchmark (#4894)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1f523d9 MINOR: add window store range query in simple benchmark (#4894)
1f523d9 is described below
commit 1f523d9d72ad3008664146e1d2a5b40861b06072
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Apr 19 16:14:32 2018 -0700
MINOR: add window store range query in simple benchmark (#4894)
There are a couple minor additions in this PR:
1. Add a new test for window store, to range query upon receiving each record.
2. In the non-windowed state store case, add a get call before the put call.
3. Enable caching by default to be consistent with other Join / Aggregate cases, where caching is enabled by default.
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/perf/SimpleBenchmark.java | 58 +++++++++++++++++++++-
.../streams/streams_simple_benchmark_test.py | 4 +-
2 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 9dd97c6..d956f27 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -42,13 +42,16 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
import java.io.IOException;
import java.util.ArrayList;
@@ -187,6 +190,9 @@ public class SimpleBenchmark {
case "streamprocesswithstatestore":
processStreamWithStateStore(SOURCE_TOPIC_ONE);
break;
+ case "streamprocesswithwindowstore":
+ processStreamWithWindowStore(SOURCE_TOPIC_ONE);
+ break;
case "streamtablejoin":
streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
break;
@@ -426,7 +432,7 @@ public class SimpleBenchmark {
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
= Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
- builder.addStateStore(storeBuilder);
+ builder.addStateStore(storeBuilder.withCachingEnabled());
final KStream<Integer, byte[]> source = builder.stream(topic);
@@ -445,6 +451,56 @@ public class SimpleBenchmark {
@Override
public void process(final Integer key, final byte[] value) {
+ store.get(key);
+ store.put(key, value);
+ }
+ };
+ }
+ }, "store");
+
+ final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
+ runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
+ }
+
+ private void processStreamWithWindowStore(final String topic) {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ setStreamProperties("simple-benchmark-streams-with-store");
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
+ = Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
+ AGGREGATE_WINDOW_SIZE * 3,
+ 3,
+ AGGREGATE_WINDOW_SIZE,
+ false),
+ INTEGER_SERDE, BYTE_SERDE);
+ builder.addStateStore(storeBuilder.withCachingEnabled());
+
+ final KStream<Integer, byte[]> source = builder.stream(topic);
+
+ source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
+ @Override
+ public Processor<Integer, byte[]> get() {
+ return new AbstractProcessor<Integer, byte[]>() {
+ WindowStore<Integer, byte[]> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ store = (WindowStore<Integer, byte[]>) context.getStateStore("store");
+ }
+
+ @Override
+ public void process(final Integer key, final byte[] value) {
+ final long timestamp = context().timestamp();
+ final KeyValueIterator<Windowed<Integer>, byte[]> iter = store.fetch(key - 10, key + 10, timestamp - 1000L, timestamp + 1000L);
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ iter.close();
+
store.put(key, value);
}
};
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 0f1d33f..6e6d676 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -23,8 +23,8 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.version import DEV_BRANCH
-STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore"]
-STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed", "streamprocesswithstatestore"]
+STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore"]
+STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed"]
STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
NON_STREAMS_TESTS = ["consume", "consumeproduce"]
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.