You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/12/03 06:13:37 UTC
[kafka] branch 2.1 updated: MINOR: improve
QueryableStateIntegrationTest (#5987)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 975aeda MINOR: improve QueryableStateIntegrationTest (#5987)
975aeda is described below
commit 975aeda906218b95aaa8ab8a1ea130f6980df21b
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Sun Dec 2 22:11:04 2018 -0800
MINOR: improve QueryableStateIntegrationTest (#5987)
Fix test Comparators plus Java8 cleanup
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../integration/QueryableStateIntegrationTest.java | 321 ++++++++-------------
1 file changed, 114 insertions(+), 207 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 76eec71..06014a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsTest;
import org.apache.kafka.streams.KeyValue;
@@ -37,24 +36,21 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
@@ -71,7 +67,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -80,6 +75,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
@@ -128,7 +124,7 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
- private void createTopics() throws InterruptedException {
+ private void createTopics() throws Exception {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
@@ -152,7 +148,9 @@ public class QueryableStateIntegrationTest {
List<String> input = new ArrayList<>();
final ClassLoader classLoader = getClass().getClassLoader();
final String fileName = "QueryableStateIntegrationTest" + File.separator + "inputValues.txt";
- try (final BufferedReader reader = new BufferedReader(new FileReader(classLoader.getResource(fileName).getFile()))) {
+ try (final BufferedReader reader = new BufferedReader(
+ new FileReader(Objects.requireNonNull(classLoader.getResource(fileName)).getFile()))) {
+
for (String line = reader.readLine(); line != null; line = reader.readLine()) {
input.add(line);
}
@@ -194,23 +192,8 @@ public class QueryableStateIntegrationTest {
// override this to make the rebalances happen quickly
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
- stringComparator = new Comparator<KeyValue<String, String>>() {
-
- @Override
- public int compare(final KeyValue<String, String> o1,
- final KeyValue<String, String> o2) {
- return o1.key.compareTo(o2.key);
- }
- };
- stringLongComparator = new Comparator<KeyValue<String, Long>>() {
-
- @Override
- public int compare(final KeyValue<String, Long> o1,
- final KeyValue<String, Long> o2) {
- return o1.key.compareTo(o2.key);
- }
- };
+ stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);
+ stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) -> o.key).thenComparingLong(o -> o.value);
inputValues = getInputValues();
inputValuesKeys = new HashSet<>();
for (final String sentence : inputValues) {
@@ -221,14 +204,13 @@ public class QueryableStateIntegrationTest {
}
@After
- public void shutdown() throws IOException {
+ public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(ofSeconds(30));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
-
/**
* Creates a typical word count topology
*/
@@ -243,30 +225,20 @@ public class QueryableStateIntegrationTest {
final KStream<String, String> textLines = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
final KGroupedStream<String, String> groupedByWord = textLines
- .flatMapValues(new ValueMapper<String, Iterable<String>>() {
- @Override
- public Iterable<String> apply(final String value) {
- return Arrays.asList(value.split("\\W+"));
- }
- })
- .groupBy(MockMapper.<String, String>selectValueMapper());
+ .flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split("\\W+")))
+ .groupBy(MockMapper.selectValueMapper());
// Create a State Store for the all time word count
groupedByWord
- .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName + "-" + inputTopic))
+ .count(Materialized.as(storeName + "-" + inputTopic))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
// Create a Windowed State Store that contains the word count for every 1 minute
groupedByWord
.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
- .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName + "-" + inputTopic))
- .toStream(new KeyValueMapper<Windowed<String>, Long, String>() {
- @Override
- public String apply(final Windowed<String> key, final Long value) {
- return key.key();
- }
- })
+ .count(Materialized.as(windowStoreName + "-" + inputTopic))
+ .toStream((key, value) -> key.key())
.to(windowOutputTopic, Produced.with(Serdes.String(), Serdes.Long()));
return new KafkaStreams(builder.build(), streamsConfiguration);
@@ -292,7 +264,6 @@ public class QueryableStateIntegrationTest {
@Override
public void run() {
myStream.start();
-
}
public void close() {
@@ -315,72 +286,68 @@ public class QueryableStateIntegrationTest {
}
}
- private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+ private void verifyAllKVKeys(final StreamRunnable[] streamRunnables,
+ final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
- final Set<String> keys, final String storeName) throws InterruptedException {
+ final Set<String> keys,
+ final String storeName) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
-
- if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
- return false;
- }
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
- final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
-
- return store != null && store.get(key) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
- assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
+ TestUtils.waitForCondition(() -> {
+ try {
+ final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+
+ if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
return false;
}
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+ final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
+ return store != null && store.get(key) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+ assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
+ return false;
}
}, 120000, "waiting for metadata, store and value to be non null");
}
}
-
- private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+ private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables,
+ final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
- final Set<String> keys, final String storeName,
- final Long from, final Long to) throws InterruptedException {
+ final Set<String> keys,
+ final String storeName,
+ final Long from,
+ final Long to) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
- if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
- return false;
- }
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
- final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
- return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
- assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
+ TestUtils.waitForCondition(() -> {
+ try {
+ final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+ if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
return false;
}
-
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+ final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
+ return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+ assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
+ return false;
}
}, 120000, "waiting for metadata, store and value to be non null");
}
}
@Test
- public void queryOnRebalance() throws InterruptedException {
+ public void queryOnRebalance() throws Exception {
final int numThreads = STREAM_TWO_PARTITIONS;
final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
final Thread[] streamThreads = new Thread[numThreads];
@@ -388,7 +355,6 @@ public class QueryableStateIntegrationTest {
final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1);
producerRunnable.run();
-
// create stream threads
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
@@ -434,7 +400,7 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void concurrentAccesses() throws InterruptedException {
+ public void concurrentAccesses() throws Exception {
final int numIterations = 500000;
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
@@ -451,16 +417,15 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
final ReadOnlyKeyValueStore<String, Long>
- keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>keyValueStore());
+ keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
- kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.<String, Long>windowStore());
-
+ kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore());
final Map<String, Long> expectedWindowState = new HashMap<>();
final Map<String, Long> expectedCount = new HashMap<>();
while (producerRunnable.getCurrIteration() < numIterations) {
- verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
+ verifyGreaterOrEqual(inputValuesKeys.toArray(new String[0]), expectedWindowState,
expectedCount, windowStore, keyValueStore, true);
}
} finally {
@@ -505,15 +470,10 @@ public class QueryableStateIntegrationTest {
LongSerializer.class,
new Properties()),
mockTime);
- final Predicate<String, Long> filterPredicate = new Predicate<String, Long>() {
- @Override
- public boolean test(final String key, final Long value) {
- return key.contains("kafka");
- }
- };
+ final Predicate<String, Long> filterPredicate = (key, value) -> key.contains("kafka");
final KTable<String, Long> t1 = builder.table(streamOne);
- final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
- t1.filterNot(filterPredicate, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryFilterNot"));
+ final KTable<String, Long> t2 = t1.filter(filterPredicate, Materialized.as("queryFilter"));
+ t1.filterNot(filterPredicate, Materialized.as("queryFilterNot"));
t2.toStream().to(outputTopic);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -522,9 +482,9 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
final ReadOnlyKeyValueStore<String, Long>
- myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.<String, Long>keyValueStore());
+ myFilterStore = kafkaStreams.store("queryFilter", QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<String, Long>
- myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.<String, Long>keyValueStore());
+ myFilterNotStore = kafkaStreams.store("queryFilterNot", QueryableStoreTypes.keyValueStore());
for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)),
@@ -575,12 +535,7 @@ public class QueryableStateIntegrationTest {
mockTime);
final KTable<String, String> t1 = builder.table(streamOne);
- t1.mapValues(new ValueMapper<String, Long>() {
- @Override
- public Long apply(final String value) {
- return Long.valueOf(value);
- }
- }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+ t1.mapValues((ValueMapper<String, Long>) Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
@@ -591,7 +546,7 @@ public class QueryableStateIntegrationTest {
final ReadOnlyKeyValueStore<String, Long>
myMapStore = kafkaStreams.store("queryMapValues",
- QueryableStoreTypes.<String, Long>keyValueStore());
+ QueryableStoreTypes.keyValueStore());
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
}
@@ -623,20 +578,10 @@ public class QueryableStateIntegrationTest {
new Properties()),
mockTime);
- final Predicate<String, String> filterPredicate = new Predicate<String, String>() {
- @Override
- public boolean test(final String key, final String value) {
- return key.contains("kafka");
- }
- };
+ final Predicate<String, String> filterPredicate = (key, value) -> key.contains("kafka");
final KTable<String, String> t1 = builder.table(streamOne);
- final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryFilter"));
- final KTable<String, Long> t3 = t2.mapValues(new ValueMapper<String, Long>() {
- @Override
- public Long apply(final String value) {
- return Long.valueOf(value);
- }
- }, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
+ final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.as("queryFilter"));
+ final KTable<String, Long> t3 = t2.mapValues((ValueMapper<String, Long>) Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -646,7 +591,7 @@ public class QueryableStateIntegrationTest {
final ReadOnlyKeyValueStore<String, Long>
myMapStore = kafkaStreams.store("queryMapValues",
- QueryableStoreTypes.<String, Long>keyValueStore());
+ QueryableStoreTypes.keyValueStore());
for (final KeyValue<String, Long> expectedEntry : expectedBatch1) {
assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
}
@@ -671,7 +616,6 @@ public class QueryableStateIntegrationTest {
new KeyValue<>(keys[3], "go"),
new KeyValue<>(keys[4], "kafka")));
-
final Set<KeyValue<String, Long>> expectedCount = new TreeSet<>(stringLongComparator);
for (final String key : keys) {
expectedCount.add(new KeyValue<>(key, 1L));
@@ -692,24 +636,24 @@ public class QueryableStateIntegrationTest {
// Non Windowed
final String storeName = "my-count";
s1.groupByKey()
- .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName))
+ .count(Materialized.as(storeName))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
final String windowStoreName = "windowed-count";
s1.groupByKey()
.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
- .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(windowStoreName));
+ .count(Materialized.as(windowStoreName));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
final ReadOnlyKeyValueStore<String, Long>
- myCount = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ myCount = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
- kafkaStreams.store(windowStoreName, QueryableStoreTypes.<String, Long>windowStore());
+ kafkaStreams.store(windowStoreName, QueryableStoreTypes.windowStore());
verifyCanGetByKey(keys,
expectedCount,
expectedCount,
@@ -725,7 +669,7 @@ public class QueryableStateIntegrationTest {
final KStream<String, String> stream = builder.stream(streamThree);
final String storeName = "count-by-key";
- stream.groupByKey().count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(storeName));
+ stream.groupByKey().count(Materialized.as(storeName));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
@@ -743,14 +687,9 @@ public class QueryableStateIntegrationTest {
final int maxWaitMs = 30000;
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return new Long(8).equals(store.get("hello"));
- }
- }, maxWaitMs, "wait for count to be 8");
+ TestUtils.waitForCondition(() -> new Long(8).equals(store.get("hello")), maxWaitMs, "wait for count to be 8");
// close stream
kafkaStreams.close();
@@ -760,15 +699,12 @@ public class QueryableStateIntegrationTest {
kafkaStreams.start();
// make sure we never get any value other than 8 for hello
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- try {
- assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
- return true;
- } catch (final InvalidStateStoreException ise) {
- return false;
- }
+ TestUtils.waitForCondition(() -> {
+ try {
+ assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
+ return true;
+ } catch (final InvalidStateStoreException ise) {
+ return false;
}
}, maxWaitMs, "waiting for store " + storeName);
@@ -780,6 +716,7 @@ public class QueryableStateIntegrationTest {
WaitForStore(final String storeName) {
this.storeName = storeName;
}
+
@Override
public boolean conditionMet() {
try {
@@ -801,28 +738,20 @@ public class QueryableStateIntegrationTest {
final KStream<String, String> input = builder.stream(streamOne);
input
.groupByKey()
- .reduce(new Reducer<String>() {
- @Override
- public String apply(final String value1, final String value2) {
- if (value1.length() > 1) {
- if (beforeFailure.compareAndSet(true, false)) {
- throw new RuntimeException("Injected test exception");
- }
+ .reduce((value1, value2) -> {
+ if (value1.length() > 1) {
+ if (beforeFailure.compareAndSet(true, false)) {
+ throw new RuntimeException("Injected test exception");
}
- return value1 + value2;
}
- }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName))
+ return value1 + value2;
+ }, Materialized.as(storeName))
.toStream()
.to(outputTopic);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- failed.set(true);
- }
- });
+ kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
kafkaStreams.start();
IntegrationTestUtils.produceKeyValuesSynchronously(
@@ -838,14 +767,12 @@ public class QueryableStateIntegrationTest {
final int maxWaitMs = 30000;
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+ final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "12".equals(store.get("a")) && "34".equals(store.get("b"));
- }
- }, maxWaitMs, "wait for agg to be <a,12> and <b,34>");
+ TestUtils.waitForCondition(
+ () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
+ maxWaitMs,
+ "wait for agg to be <a,12> and <b,34>");
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -857,44 +784,31 @@ public class QueryableStateIntegrationTest {
new Properties()),
mockTime);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return failed.get();
- }
- }, 30000, "wait for thread to fail");
+ TestUtils.waitForCondition(failed::get, 30000, "wait for thread to fail");
TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.<String, String>keyValueStore());
+ final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
try {
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return
- ("125".equals(store2.get("a"))
- || "1225".equals(store2.get("a"))
- || "12125".equals(store2.get("a")))
- &&
- ("34".equals(store2.get("b"))
- || "344".equals(store2.get("b"))
- || "3434".equals(store2.get("b")));
- }
- }, maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
+ TestUtils.waitForCondition(() ->
+ ("125".equals(store2.get("a"))
+ || "1225".equals(store2.get("a"))
+ || "12125".equals(store2.get("a")))
+ &&
+ ("34".equals(store2.get("b"))
+ || "344".equals(store2.get("b"))
+ || "3434".equals(store2.get("b"))), maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
} catch (final Throwable t) {
throw new RuntimeException("Store content is a: " + store2.get("a") + "; b: " + store2.get("b"), t);
}
-
}
private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyKeyValueStore<String, Long> myCount) {
final Set<KeyValue<String, Long>> countRangeResults = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countAllResults = new TreeSet<>(stringLongComparator);
- final Set<KeyValue<String, Long>>
- expectedRangeResults =
- new TreeSet<>(stringLongComparator);
+ final Set<KeyValue<String, Long>> expectedRangeResults = new TreeSet<>(stringLongComparator);
expectedRangeResults.addAll(Arrays.asList(
new KeyValue<>("hello", 1L),
@@ -923,8 +837,7 @@ public class QueryableStateIntegrationTest {
final Set<KeyValue<String, Long>> expectedWindowState,
final Set<KeyValue<String, Long>> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
- final ReadOnlyKeyValueStore<String, Long> myCount)
- throws InterruptedException {
+ final ReadOnlyKeyValueStore<String, Long> myCount) throws Exception {
final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
@@ -1001,15 +914,13 @@ public class QueryableStateIntegrationTest {
}
- private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
+ private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws Exception {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- StringDeserializer.class.getName());
- config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- LongDeserializer.class.getName());
+ config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
config,
topic,
@@ -1019,7 +930,6 @@ public class QueryableStateIntegrationTest {
private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
final String key) {
-
final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
@@ -1030,7 +940,6 @@ public class QueryableStateIntegrationTest {
private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
final String key) {
-
final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
@@ -1039,7 +948,6 @@ public class QueryableStateIntegrationTest {
return Collections.emptyMap();
}
-
/**
* A class that periodically produces records in a separate thread
*/
@@ -1077,11 +985,11 @@ public class QueryableStateIntegrationTest {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (final KafkaProducer<String, String> producer =
- new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
+ new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {
while (getCurrIteration() < numIterations && !shutdown) {
for (final String value : inputValues) {
- producer.send(new ProducerRecord<String, String>(topic, value));
+ producer.send(new ProducerRecord<>(topic, value));
}
incrementIteration();
}
@@ -1089,5 +997,4 @@ public class QueryableStateIntegrationTest {
}
}
-
}