You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/14 02:46:35 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11705: KAFKA-9847: add config to set default store type

guozhangwang commented on a change in pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#discussion_r805466422



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
##########
@@ -121,11 +121,20 @@
                     + " grace=[" + sessionWindows.gracePeriodMs() + "],"
                     + " retention=[" + retentionPeriod + "]");
             }
-            supplier = Stores.persistentSessionStore(
-                materialized.storeName(),
-                Duration.ofMillis(retentionPeriod)
-            );
+
+            if (materialized.storeType().equals(Materialized.StoreType.IN_MEMORY)) {

Review comment:
       Could we use `switch-case` instead of `if-else` in case we add more store types in the future? Ditto elsewhere.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
##########
@@ -56,6 +69,10 @@ public String storeName() {
         return storeName;
     }
 
+    public StoreType storeType() {
+        return storeType == null ? StoreType.ROCKS_DB : storeType;

Review comment:
       The `storeType` should never be null since it's always initialized at construction time right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -1262,6 +1336,381 @@ public void timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure(
                 "      <-- KSTREAM-SOURCE-0000000000\n\n",
             describe.toString()
         );
+
+        assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(true));
+    }
+
+    @Test
+    public void timeWindowAnonymousStoreTypeMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .count(Materialized.as(Materialized.StoreType.IN_MEMORY));
+        final Topology topology = builder.build();
+        final TopologyDescription describe = topology.describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+
+        assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
+    }
+
+    @Test
+    public void timeWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
+        // override the default store into in-memory
+        final StreamsBuilder builder = new StreamsBuilder(overrideDefaultStore(StreamsConfig.IN_MEMORY));
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .count();
+        final Topology topology = builder.build();
+        final TopologyDescription describe = topology.describe();
+        assertEquals(
+            "Topology: my-topology:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+
+        assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(), is(false));
+    }
+
+    @Test
+    public void slidingWindowZeroArgCountShouldPreserveTopologyStructure() {

Review comment:
       Thanks for adding coverage for sliding windows.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -153,6 +166,17 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
         } else {
             deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
         }
+
+        // Override the default store type config no matter it's a named topology or not since it should apply to all topologies
+        storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+        log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DSL_STORE_CONFIG, storeType);
+    }
+
+    public Materialized.StoreType parseStoreType() {
+        if (storeType.equals(IN_MEMORY)) {

Review comment:
       Ditto here for using `switch case`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
##########
@@ -113,14 +114,23 @@
                     + "]");
             }
 
-            supplier = Stores.persistentTimestampedWindowStore(
-                materialized.storeName(),
-                Duration.ofMillis(retentionPeriod),
-                Duration.ofMillis(windows.timeDifferenceMs()),
-                false
-            );
-
+            if (materialized.storeType().equals(Materialized.StoreType.IN_MEMORY)) {
+                supplier = Stores.inMemoryWindowStore(

Review comment:
       Just double checking if `inMemoryWindowStore` maps to timestamped-window store too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org