You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/22 18:45:35 UTC

[GitHub] [kafka] vvcephei opened a new pull request #11624: MINOR: add PAPI KV store tests for IQv2

vvcephei opened a new pull request #11624:
URL: https://github.com/apache/kafka/pull/11624


   During some recent reviews, @mjsax pointed out that StateStore layers
   are constructed differently the stores are added via the PAPI vs. the DSL.
   
   This PR adds PAPI construction to the IQv2StoreIntegrationTest so that
   we can ensure IQv2 works on every possible state store.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11624: MINOR: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r774097786



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -287,31 +306,42 @@ public boolean keyValue() {
         }
     }
 
-    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}")
+    @Parameterized.Parameters(name = "cache={0}, log={1}, supplier={2}, kind={3}")
     public static Collection<Object[]> data() {
+        LOG.info("Generating test cases according to random seed: {}", SEED);
         final List<Object[]> values = new ArrayList<>();
         for (final boolean cacheEnabled : Arrays.asList(true, false)) {
             for (final boolean logEnabled : Arrays.asList(true, false)) {
                 for (final StoresToTest toTest : StoresToTest.values()) {
-                    values.add(new Object[]{cacheEnabled, logEnabled, toTest.name()});
+                    for (final String kind : Arrays.asList("DSL", "PAPI")) {
+                        values.add(new Object[]{cacheEnabled, logEnabled, toTest.name(), kind});
+                    }
                 }
             }
         }
+        // Randomizing the test cases in case some orderings interfere with each other.
+        // If you wish to reproduce a randomized order, copy the logged SEED and substitute
+        // it for the constant at the top of the file. This will cause exactly the same sequence
+        // of pseudorandom values to be generated.
+        Collections.shuffle(values, RANDOM);

Review comment:
       I added this because it occurred to me that there's a small chance that the tests only pass in the order we generate them in. By shuffling the cases, we ensure that every possible ordering will eventually be run. The price we pay is that if there's an ordering that causes the test to fail, it will appear to be flaky. For that reason, we initialize the Random with a seed and log it. As the comment states, you can re-run the exact same ordering by setting SEED to the one that was logged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005074050


   Hey @vvcephei , please feel free to merge after addressed the comments and resolved conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on pull request #11624: MINOR: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1000184098


   Looks good to me, thanks @vvcephei !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11624: MINOR: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r774098263



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");

Review comment:
       This should be filled in once there are window tests. cc @patrickstuedi 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r778456441



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfig,
+                builder,
+                true
+            );
+    }
 
-            if (storeToTest.global()) {
-                builder.globalTable(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            } else {
-                builder.table(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            }
-        } else if (supplier instanceof WindowBytesStoreSupplier) {
-            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
-                Materialized.as((WindowBytesStoreSupplier) supplier);
+    private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();

Review comment:
       Oops! Thanks for the catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11624: MINOR: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r774098354



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");

Review comment:
       This should be filled in once there are session tests. cc @patrickstuedi 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11624:
URL: https://github.com/apache/kafka/pull/11624


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005250957


   Thanks, @guozhangwang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r778303747



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfig,
+                builder,
+                true
+            );
+    }
 
-            if (storeToTest.global()) {
-                builder.globalTable(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            } else {
-                builder.table(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            }
-        } else if (supplier instanceof WindowBytesStoreSupplier) {
-            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
-                Materialized.as((WindowBytesStoreSupplier) supplier);
+    private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();

Review comment:
       withLoggingDisabled?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfig,
+                builder,
+                true
+            );
+    }
 
-            if (storeToTest.global()) {
-                builder.globalTable(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            } else {
-                builder.table(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            }
-        } else if (supplier instanceof WindowBytesStoreSupplier) {
-            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
-                Materialized.as((WindowBytesStoreSupplier) supplier);
+    private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    materialized
-                );
-        } else if (supplier instanceof SessionBytesStoreSupplier) {
-            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
-                Materialized.as((SessionBytesStoreSupplier) supplier);
+        builder
+            .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .groupByKey()
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+            .aggregate(
+                () -> 0,
+                (key, value, aggregate) -> aggregate + value,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                materialized
+            );
+    }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+    private void setUpWindowDSLTopology(final WindowBytesStoreSupplier supplier,
+                                        final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
-                    materialized
-                );
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
         } else {
-            throw new AssertionError("Store supplier is an unrecognized type.");
+            materialized.withCachingDisabled();

Review comment:
       withLoggingDisabled?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfig,
+                builder,
+                true
+            );
+    }
 
-            if (storeToTest.global()) {
-                builder.globalTable(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            } else {
-                builder.table(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            }
-        } else if (supplier instanceof WindowBytesStoreSupplier) {
-            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
-                Materialized.as((WindowBytesStoreSupplier) supplier);
+    private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    materialized
-                );
-        } else if (supplier instanceof SessionBytesStoreSupplier) {
-            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
-                Materialized.as((SessionBytesStoreSupplier) supplier);
+        builder
+            .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .groupByKey()
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+            .aggregate(
+                () -> 0,
+                (key, value, aggregate) -> aggregate + value,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                materialized
+            );
+    }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+    private void setUpWindowDSLTopology(final WindowBytesStoreSupplier supplier,
+                                        final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
-                    materialized
-                );
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
         } else {
-            throw new AssertionError("Store supplier is an unrecognized type.");
+            materialized.withCachingDisabled();
         }
 
-        // Don't need to wait for running, since tests can use iqv2 to wait until they
-        // get a valid response.
+        builder
+            .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
+            .aggregate(
+                () -> 0,
+                (key, value, aggregate) -> aggregate + value,
+                materialized
+            );
+    }
 
-        kafkaStreams =
-            IntegrationTestUtils.getStartedStreams(
-                streamsConfig,
-                builder,
-                true
+    private void setUpKeyValueDSLTopology(final KeyValueBytesStoreSupplier supplier,
+                                          final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
+
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
+
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();
+        }
+
+        if (storeToTest.global()) {
+            builder.globalTable(
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                materialized
+            );
+        } else {
+            builder.table(
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                materialized
+            );
+        }
+    }
+
+    private void setUpKeyValuePAPITopology(final KeyValueBytesStoreSupplier supplier,
+                                           final StreamsBuilder builder) {
+        final StoreBuilder<?> keyValueStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> processorSupplier;
+        if (storeToTest.timestamped()) {
+            keyValueStoreStoreBuilder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier = () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                @Override
+                public void process(final Record<Integer, Integer> record) {
+                    final TimestampedKeyValueStore<Integer, Integer> stateStore =
+                        context().getStateStore(keyValueStoreStoreBuilder.name());
+                    stateStore.put(
+                        record.key(),
+                        ValueAndTimestamp.make(
+                            record.value(), record.timestamp()
+                        )
+                    );
+                }
+            };
+        } else {
+            keyValueStoreStoreBuilder = Stores.keyValueStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier =
+                () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                    @Override
+                    public void process(final Record<Integer, Integer> record) {
+                        final KeyValueStore<Integer, Integer> stateStore =
+                            context().getStateStore(keyValueStoreStoreBuilder.name());
+                        stateStore.put(record.key(), record.value());
+                    }
+                };
+        }
+        if (cache) {
+            keyValueStoreStoreBuilder.withCachingEnabled();
+        } else {
+            keyValueStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            keyValueStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            keyValueStoreStoreBuilder.withCachingDisabled();

Review comment:
       Ditto.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) {
+            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) {
+            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder);
+        } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) {
+            throw new AssumptionViolatedException("Case not implemented yet");
+        } else {
+            throw new AssertionError("Store supplier is an unrecognized type.");
+        }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        // Don't need to wait for running, since tests can use iqv2 to wait until they
+        // get a valid response.
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        kafkaStreams =
+            IntegrationTestUtils.getStartedStreams(
+                streamsConfig,
+                builder,
+                true
+            );
+    }
 
-            if (storeToTest.global()) {
-                builder.globalTable(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            } else {
-                builder.table(
-                    INPUT_TOPIC_NAME,
-                    Consumed.with(Serdes.Integer(), Serdes.Integer()),
-                    materialized
-                );
-            }
-        } else if (supplier instanceof WindowBytesStoreSupplier) {
-            final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
-                Materialized.as((WindowBytesStoreSupplier) supplier);
+    private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier,
+                                         final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    materialized
-                );
-        } else if (supplier instanceof SessionBytesStoreSupplier) {
-            final Materialized<Integer, Integer, SessionStore<Bytes, byte[]>> materialized =
-                Materialized.as((SessionBytesStoreSupplier) supplier);
+        builder
+            .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .groupByKey()
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
+            .aggregate(
+                () -> 0,
+                (key, value, aggregate) -> aggregate + value,
+                (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
+                materialized
+            );
+    }
 
-            if (cache) {
-                materialized.withCachingEnabled();
-            } else {
-                materialized.withCachingDisabled();
-            }
+    private void setUpWindowDSLTopology(final WindowBytesStoreSupplier supplier,
+                                        final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, WindowStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
 
-            if (log) {
-                materialized.withLoggingEnabled(Collections.emptyMap());
-            } else {
-                materialized.withCachingDisabled();
-            }
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
 
-            builder
-                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
-                .groupByKey()
-                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE))
-                .aggregate(
-                    () -> 0,
-                    (key, value, aggregate) -> aggregate + value,
-                    (aggKey, aggOne, aggTwo) -> aggOne + aggTwo,
-                    materialized
-                );
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
         } else {
-            throw new AssertionError("Store supplier is an unrecognized type.");
+            materialized.withCachingDisabled();
         }
 
-        // Don't need to wait for running, since tests can use iqv2 to wait until they
-        // get a valid response.
+        builder
+            .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
+            .aggregate(
+                () -> 0,
+                (key, value, aggregate) -> aggregate + value,
+                materialized
+            );
+    }
 
-        kafkaStreams =
-            IntegrationTestUtils.getStartedStreams(
-                streamsConfig,
-                builder,
-                true
+    private void setUpKeyValueDSLTopology(final KeyValueBytesStoreSupplier supplier,
+                                          final StreamsBuilder builder) {
+        final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
+            Materialized.as(supplier);
+
+        if (cache) {
+            materialized.withCachingEnabled();
+        } else {
+            materialized.withCachingDisabled();
+        }
+
+        if (log) {
+            materialized.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            materialized.withCachingDisabled();

Review comment:
       Ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11624: MINOR: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r774098877



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##########
@@ -365,109 +395,200 @@ public void beforeTest() {
         final Properties streamsConfig = streamsConfiguration(
             cache,
             log,
-            storeToTest.name()
+            storeToTest.name(),
+            kind
         );
 
         final StreamsBuilder builder = new StreamsBuilder();
-        if (supplier instanceof KeyValueBytesStoreSupplier) {
-            final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
-                Materialized.as((KeyValueBytesStoreSupplier) supplier);
+        if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) {
+            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder);

Review comment:
       This method was already too long to comfortably read. Doubling its length by adding PAPI was out of the question, so I extracted some methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1006246786


   Unrelated test failure:
   ```
   Build / JDK 17 and Scala 2.13 / testTopicIdUpgradeAfterReassigningPartitions() – kafka.controller.ControllerIntegrationTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org