You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/05 21:48:10 UTC
[kafka] 01/01: MINOR: Add PAPI Window and Session store tests for IQv2
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch iqv2-add-window-session-papi-tests
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 99d36b5e1116c5a20e4e8b59e17a15f4f3eff8bf
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Jan 5 15:46:45 2022 -0600
MINOR: Add PAPI Window and Session store tests for IQv2
---
.../integration/IQv2StoreIntegrationTest.java | 128 +++++++++++++++++++--
1 file changed, 121 insertions(+), 7 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index ed6efaf..718ad15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@@ -452,11 +454,11 @@ public class IQv2StoreIntegrationTest {
} else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
} else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
- throw new AssumptionViolatedException("Case not implemented yet");
+ setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, builder);
} else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
} else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
- throw new AssumptionViolatedException("Case not implemented yet");
+ setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, builder);
} else {
throw new AssertionError("Store supplier is an unrecognized type.");
}
@@ -626,6 +628,118 @@ public class IQv2StoreIntegrationTest {
}
+ private void setUpWindowPAPITopology(final WindowBytesStoreSupplier supplier,
+ final StreamsBuilder builder) {
+ final StoreBuilder<?> windowStoreStoreBuilder;
+ final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+ if (storeToTest.timestamped()) {
+ windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder(
+ supplier,
+ Serdes.Integer(),
+ Serdes.Integer()
+ );
+ processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+ @Override
+ public void process(final Record<Integer, Integer> record) {
+ final TimestampedWindowStore<Integer, Integer> stateStore =
+ context().getStateStore(windowStoreStoreBuilder.name());
+ stateStore.put(
+ record.key(),
+ ValueAndTimestamp.make(
+ record.value(), record.timestamp()
+ ),
+ WINDOW_START
+ );
+ }
+ };
+ } else {
+ windowStoreStoreBuilder = Stores.windowStoreBuilder(
+ supplier,
+ Serdes.Integer(),
+ Serdes.Integer()
+ );
+ processorSupplier =
+ () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+ @Override
+ public void process(final Record<Integer, Integer> record) {
+ final WindowStore<Integer, Integer> stateStore =
+ context().getStateStore(windowStoreStoreBuilder.name());
+ stateStore.put(record.key(), record.value(), WINDOW_START);
+ }
+ };
+ }
+ if (cache) {
+ windowStoreStoreBuilder.withCachingEnabled();
+ } else {
+ windowStoreStoreBuilder.withCachingDisabled();
+ }
+ if (log) {
+ windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+ } else {
+ windowStoreStoreBuilder.withCachingDisabled();
+ }
+ if (storeToTest.global()) {
+ builder.addGlobalStore(
+ windowStoreStoreBuilder,
+ INPUT_TOPIC_NAME,
+ Consumed.with(Serdes.Integer(), Serdes.Integer()),
+ processorSupplier
+ );
+ } else {
+ builder.addStateStore(windowStoreStoreBuilder);
+ builder
+ .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+ .process(processorSupplier, windowStoreStoreBuilder.name());
+ }
+
+ }
+
+ private void setUpSessionPAPITopology(final SessionBytesStoreSupplier supplier,
+ final StreamsBuilder builder) {
+ final StoreBuilder<?> sessionStoreStoreBuilder;
+ final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+ sessionStoreStoreBuilder = Stores.sessionStoreBuilder(
+ supplier,
+ Serdes.Integer(),
+ Serdes.Integer()
+ );
+ processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+ @Override
+ public void process(final Record<Integer, Integer> record) {
+ final SessionStore<Integer, Integer> stateStore =
+ context().getStateStore(sessionStoreStoreBuilder.name());
+ stateStore.put(
+ new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
+ record.value()
+ );
+ }
+ };
+ if (cache) {
+ sessionStoreStoreBuilder.withCachingEnabled();
+ } else {
+ sessionStoreStoreBuilder.withCachingDisabled();
+ }
+ if (log) {
+ sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+ } else {
+ sessionStoreStoreBuilder.withCachingDisabled();
+ }
+ if (storeToTest.global()) {
+ builder.addGlobalStore(
+ sessionStoreStoreBuilder,
+ INPUT_TOPIC_NAME,
+ Consumed.with(Serdes.Integer(), Serdes.Integer()),
+ processorSupplier
+ );
+ } else {
+ builder.addStateStore(sessionStoreStoreBuilder);
+ builder
+ .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+ .process(processorSupplier, sessionStoreStoreBuilder.name());
+ }
+
+ }
+
@After
public void afterTest() {
@@ -800,13 +914,13 @@ public class IQv2StoreIntegrationTest {
throw new AssertionError(queryResult.toString());
}
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
- assertThat(partitionResult.getFailureMessage(), is(
+ assertThat(partitionResult.getFailureMessage(), matchesPattern(
"This store"
- + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+ + " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
+ " doesn't know how to execute the given query"
- + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
- + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
- + " Contact the store maintainer if you need support for a new query type."
+ + " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+ + " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."
+ + " Contact the store maintainer if you need support for a new query type\\."
));
}
}