You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/05 21:48:10 UTC

[kafka] 01/01: MINOR: Add PAPI Window and Session store tests for IQv2

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-add-window-session-papi-tests
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 99d36b5e1116c5a20e4e8b59e17a15f4f3eff8bf
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Jan 5 15:46:45 2022 -0600

    MINOR: Add PAPI Window and Session store tests for IQv2
---
 .../integration/IQv2StoreIntegrationTest.java      | 128 +++++++++++++++++++--
 1 file changed, 121 insertions(+), 7 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index ed6efaf..718ad15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -452,11 +454,11 @@ public class IQv2StoreIntegrationTest {
         } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
             setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
             setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, builder);
         } else {
             throw new AssertionError("Store supplier is an unrecognized type.");
         }
@@ -626,6 +628,118 @@ public class IQv2StoreIntegrationTest {
 
     }
 
+    private void setUpWindowPAPITopology(final WindowBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final StoreBuilder<?> windowStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        if (storeToTest.timestamped()) {
+            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                @Override
+                public void process(final Record<Integer, Integer> record) {
+                    final TimestampedWindowStore<Integer, Integer> stateStore =
+                        context().getStateStore(windowStoreStoreBuilder.name());
+                    stateStore.put(
+                        record.key(),
+                        ValueAndTimestamp.make(
+                            record.value(), record.timestamp()
+                        ),
+                        WINDOW_START
+                    );
+                }
+            };
+        } else {
+            windowStoreStoreBuilder = Stores.windowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier =
+                () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                    @Override
+                    public void process(final Record<Integer, Integer> record) {
+                        final WindowStore<Integer, Integer> stateStore =
+                            context().getStateStore(windowStoreStoreBuilder.name());
+                        stateStore.put(record.key(), record.value(), WINDOW_START);
+                    }
+                };
+        }
+        if (cache) {
+            windowStoreStoreBuilder.withCachingEnabled();
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                windowStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(windowStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, windowStoreStoreBuilder.name());
+        }
+
+    }
+
+    private void setUpSessionPAPITopology(final SessionBytesStoreSupplier supplier,
+                                          final StreamsBuilder builder) {
+        final StoreBuilder<?> sessionStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        sessionStoreStoreBuilder = Stores.sessionStoreBuilder(
+            supplier,
+            Serdes.Integer(),
+            Serdes.Integer()
+        );
+        processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+            @Override
+            public void process(final Record<Integer, Integer> record) {
+                final SessionStore<Integer, Integer> stateStore =
+                    context().getStateStore(sessionStoreStoreBuilder.name());
+                stateStore.put(
+                    new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
+                    record.value()
+                );
+            }
+        };
+        if (cache) {
+            sessionStoreStoreBuilder.withCachingEnabled();
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                sessionStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(sessionStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, sessionStoreStoreBuilder.name());
+        }
+
+    }
+
 
     @After
     public void afterTest() {
@@ -800,13 +914,13 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
+                assertThat(partitionResult.getFailureMessage(), matchesPattern(
                     "This store"
-                        + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+                        + " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
                         + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
-                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
-                        + " Contact the store maintainer if you need support for a new query type."
+                        + " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."
+                        + " Contact the store maintainer if you need support for a new query type\\."
                 ));
             }
         }