You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/29 19:31:05 UTC

[GitHub] [kafka] spena opened a new pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

spena opened a new pull request #10613:
URL: https://github.com/apache/kafka/pull/10613


   When users supply in-memory stores for left/outer joins, then the internal shared outer store must be switch to in-memory store too. This will allow users who want to keep all stores in memory to continue doing so.
   
   Tests?
   - Added unit tests to validate topology and left/outer joins work fine with an in-memory shared store.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r624104819



##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -863,6 +920,60 @@ public void streamStreamLeftJoinTopologyWithCustomStoresNames() {
             describe.toString());
     }
 
+    @Test
+    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" +

Review comment:
       Hmm.. this makes me thinking: does it worth "working around" it to move the naming mechanism of the shared store to `sharedOuterJoinWindowStoreBuilder` above such that it always goes along with the other two store's naming patterns? As you can see here, if the store names are not provided but just the store suppliers, the existing stores would use customized name but the shared store would still use system-provided names.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -155,7 +156,7 @@ public long get() {
 
             final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
 
-            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal, persistent));

Review comment:
       Just curious: any rationale that we rely on the left store's persistence only to determine the shared store's persistence?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -266,17 +267,36 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
     @SuppressWarnings("unchecked")
     private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> sharedOuterJoinWindowStoreBuilder(final String storeName,
                                                                                                                                         final JoinWindows windows,
-                                                                                                                                        final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
-        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
-            persistentTimeOrderedWindowStore(
-                storeName + "-store",
-                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
-                Duration.ofMillis(windows.size())
-            ),
-            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
-            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
-            Time.SYSTEM
-        );
+                                                                                                                                        final StreamJoinedInternal<K, V1, V2> streamJoinedInternal,
+                                                                                                                                        final boolean persistent) {
+        final KeyAndJoinSideSerde keyAndJoinSideSerde = new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
+        final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
+
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder;
+        if (persistent) {
+            builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+                persistentTimeOrderedWindowStore(
+                    storeName + "-store",
+                    Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                    Duration.ofMillis(windows.size())
+                ),
+                keyAndJoinSideSerde,
+                leftOrRightValueSerde,
+                Time.SYSTEM
+            );
+        } else {
+            builder = Stores.windowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    storeName + "-memory-store",

Review comment:
       I think we should use the same name here; the metrics-scope would differentiate rocksdb from in-memory, which is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#issuecomment-832730649


   Thanks @guozhangwang, that seems the culprit of performance problem. In fact, when the performance tests finish, the script takes some time to complete cleaning - I assume it is taking time to clean all consumed memory.
   
   I will work on this fix in another follow-up PR, is that ok? I addressed all comments in this so it is ready to merge if you're ok with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r625811787



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -155,7 +156,7 @@ public long get() {
 
             final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
 
-            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal, persistent));

Review comment:
       No reasons at all. I just chose one side to decide whether go mem or rocks. Left seems the only side that always is True either is Left or Outer. I was thinking of maybe going mem if in left joins, the left side is mem; or if in outer joins, both sides left/right are mem. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r626253159



##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -863,6 +920,60 @@ public void streamStreamLeftJoinTopologyWithCustomStoresNames() {
             describe.toString());
     }
 
+    @Test
+    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" +

Review comment:
       Maybe we can also make it a bit simpler to just rely on the left side: if the supplier is given on the left side (which would always provide the name), then we name the shared as `thisStoreSupplier.name() + "outerJoinSuffix"`; if it is not, then we look into `streamJoinedInternal.storeName()`, and the last resort as `outerJoinStoreGeneratedName`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#issuecomment-832870539


   Merged to trunk, thanks @spena 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10613:
URL: https://github.com/apache/kafka/pull/10613


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r626250519



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -155,7 +156,7 @@ public long get() {
 
             final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
 
-            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+            outerJoinWindowStore = Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal, persistent));

Review comment:
       Maybe keeping it simple to always go with left is fine for now, I was just wondering if you have a specific reason for that :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r625833095



##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -863,6 +920,60 @@ public void streamStreamLeftJoinTopologyWithCustomStoresNames() {
             describe.toString());
     }
 
+    @Test
+    public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+
+        stream1 = builder.stream("input-topic1");
+        stream2 = builder.stream("input-topic2");
+
+        final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+        final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other",
+            Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()),
+            Duration.ofMillis(joinWindows.size()), true);
+
+        stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            joinWindows,
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                .withThisStoreSupplier(thisStoreSupplier)
+                .withOtherStoreSupplier(otherStoreSupplier));
+
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" +
+                "      --> KSTREAM-WINDOWED-0000000002\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" +
+                "      --> KSTREAM-WINDOWED-0000000003\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" +
+                "      --> KSTREAM-JOINTHIS-0000000004\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" +
+                "      --> KSTREAM-OUTEROTHER-0000000005\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" +

Review comment:
       I just moved the name builder inside the `sharedOuterJoinWindowStoreBuilder`. I did not create the name based on the store supplier, though. I would have to append a suffix and I wasn't sure if it would be too long. For instance, if the store supplier is named `in-memory-join-store`, then I would name the shared outer store as `in-memory-join-store-left-shared-join-store`. 
   
   Also, what name should I use for the full outer? The `this` supplier or the `other` supplier? Say suppliers are named `mem-left-store` and `mem-right-store`. Would I end up with `mem-left-store-outer-shared-join-store` or `mem-right-store-outer-shared-join-store`?
   
   What do you think? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org