You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/01/06 05:18:49 UTC

[kafka] branch trunk updated: KAFKA-13553: Add PAPI Window and Session store tests for IQv2 (#11650)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b424553  KAFKA-13553: Add PAPI Window and Session store tests for IQv2 (#11650)
b424553 is described below

commit b424553101c56547beafab2ae39f16671fc05c9e
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jan 5 23:16:33 2022 -0600

    KAFKA-13553: Add PAPI Window and Session store tests for IQv2 (#11650)
    
    During some recent reviews, @mjsax pointed out that StateStore layers
    are constructed differently the stores are added via the PAPI vs. the DSL.
    
    This PR adds PAPI construction for Window and Session stores to the
    IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
    possible state store.
    
    Reviewer: Guozhang Wang <gu...@apache.org>
---
 .../integration/IQv2StoreIntegrationTest.java      | 129 +++++++++++++++++++--
 1 file changed, 121 insertions(+), 8 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 7629430..11f1b9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -67,7 +69,6 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.AssumptionViolatedException;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -452,11 +453,11 @@ public class IQv2StoreIntegrationTest {
         } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
             setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
             setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, builder);
         } else {
             throw new AssertionError("Store supplier is an unrecognized type.");
         }
@@ -626,6 +627,118 @@ public class IQv2StoreIntegrationTest {
 
     }
 
+    private void setUpWindowPAPITopology(final WindowBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final StoreBuilder<?> windowStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        if (storeToTest.timestamped()) {
+            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                @Override
+                public void process(final Record<Integer, Integer> record) {
+                    final TimestampedWindowStore<Integer, Integer> stateStore =
+                        context().getStateStore(windowStoreStoreBuilder.name());
+                    stateStore.put(
+                        record.key(),
+                        ValueAndTimestamp.make(
+                            record.value(), record.timestamp()
+                        ),
+                        WINDOW_START
+                    );
+                }
+            };
+        } else {
+            windowStoreStoreBuilder = Stores.windowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier =
+                () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                    @Override
+                    public void process(final Record<Integer, Integer> record) {
+                        final WindowStore<Integer, Integer> stateStore =
+                            context().getStateStore(windowStoreStoreBuilder.name());
+                        stateStore.put(record.key(), record.value(), WINDOW_START);
+                    }
+                };
+        }
+        if (cache) {
+            windowStoreStoreBuilder.withCachingEnabled();
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            windowStoreStoreBuilder.withLoggingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                windowStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(windowStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, windowStoreStoreBuilder.name());
+        }
+
+    }
+
+    private void setUpSessionPAPITopology(final SessionBytesStoreSupplier supplier,
+                                          final StreamsBuilder builder) {
+        final StoreBuilder<?> sessionStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        sessionStoreStoreBuilder = Stores.sessionStoreBuilder(
+            supplier,
+            Serdes.Integer(),
+            Serdes.Integer()
+        );
+        processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+            @Override
+            public void process(final Record<Integer, Integer> record) {
+                final SessionStore<Integer, Integer> stateStore =
+                    context().getStateStore(sessionStoreStoreBuilder.name());
+                stateStore.put(
+                    new Windowed<>(record.key(), new SessionWindow(WINDOW_START, WINDOW_START)),
+                    record.value()
+                );
+            }
+        };
+        if (cache) {
+            sessionStoreStoreBuilder.withCachingEnabled();
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            sessionStoreStoreBuilder.withLoggingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                sessionStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(sessionStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+                .process(processorSupplier, sessionStoreStoreBuilder.name());
+        }
+
+    }
+
 
     @After
     public void afterTest() {
@@ -800,13 +913,13 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
+                assertThat(partitionResult.getFailureMessage(), matchesPattern(
                     "This store"
-                        + " (class org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+                        + " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
                         + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional[2], timeFrom=Optional.empty, timeTo=Optional.empty})"
-                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange."
-                        + " Contact the store maintainer if you need support for a new query type."
+                        + " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+                        + " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."
+                        + " Contact the store maintainer if you need support for a new query type\\."
                 ));
             }
         }