You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/05 21:48:09 UTC

[kafka] branch iqv2-add-window-session-papi-tests created (now 99d36b5)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch iqv2-add-window-session-papi-tests
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 99d36b5  MINOR: Add PAPI Window and Session store tests for IQv2

This branch includes the following new commits:

     new 99d36b5  MINOR: Add PAPI Window and Session store tests for IQv2

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kafka] 01/01: MINOR: Add PAPI Window and Session store tests for IQv2

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-add-window-session-papi-tests
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 99d36b5e1116c5a20e4e8b59e17a15f4f3eff8bf
Author: John Roesler <vv...@apache.org>
AuthorDate: Wed Jan 5 15:46:45 2022 -0600

    MINOR: Add PAPI Window and Session store tests for IQv2
---
 .../integration/IQv2StoreIntegrationTest.java      | 128 +++++++++++++++++++--
 1 file changed, 121 insertions(+), 7 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index ed6efaf..718ad15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -452,11 +454,11 @@ public class IQv2StoreIntegrationTest {
         } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
             setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
             setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, builder);
         } else {
             throw new AssertionError("Store supplier is an unrecognized type.");
         }
@@ -626,6 +628,118 @@ public class IQv2StoreIntegrationTest {
 
     }
 
+    private void setUpWindowPAPITopology(final WindowBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final StoreBuilder<?> windowStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        if (storeToTest.timestamped()) {
+            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                @Override
+                public void process(final Record<Integer, Integer> record) {
+                    final TimestampedWindowStore<Integer, Integer> stateStore =
+                        context().getStateStore(windowStoreStoreBuilder.name());
+                    stateStore.put(
+                        record.key(),
+                        ValueAndTimestamp.make(
+                            record.value(), record.timestamp()
+                        ),
+                        WINDOW_START
+                    );
+                }
+            };
+        } else {
+            windowStoreStoreBuilder = Stores.windowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier =
+                () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                    @Override
+                    public void process(final Record<Integer, Integer> record) {
+                        final WindowStore<Integer, Integer> stateStore =
+                            context().getStateStore(windowStoreStoreBuilder.name());
+                        stateStore.put(record.key(), record.value(), WINDOW_START);
+                    }
+                };
+        }
+        if (cache) {
+            windowStoreStoreBuilder.withCachingEnabled();
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                windowStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(windowStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, windowStoreStoreBuilder.name());
+        }
+
+    }
+
+    private void setUpSessionPAPITopology(final SessionBytesStoreSupplier supplier,
+                                          final StreamsBuilder builder) {
+        final StoreBuilder<?> sessionStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        sessionStoreStoreBuilder = Stores.sessionStoreBuilder(
+            supplier,
+            Serdes.Integer(),
+            Serdes.Integer()
+        );
+        processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+            @Override
+            public void process(final Record<Integer, Integer> record) {
+                final SessionStore<Integer, Integer> stateStore =
+                    context().getStateStore(sessionStoreStoreBuilder.name());
+                stateStore.put(
+                    new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
+                    record.value()
+                );
+            }
+        };
+        if (cache) {
+            sessionStoreStoreBuilder.withCachingEnabled();
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                sessionStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(sessionStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, sessionStoreStoreBuilder.name());
+        }
+
+    }
+
 
     @After
     public void afterTest() {
@@ -800,13 +914,13 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
+                assertThat(partitionResult.getFailureMessage(), matchesPattern(
                     "This store"
-                        + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+                        + " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
                         + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
-                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
-                        + " Contact the store maintainer if you need support for a new query type."
+                        + " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."
+                        + " Contact the store maintainer if you need support for a new query type\\."
                 ));
             }
         }