You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/22 21:43:52 UTC

[GitHub] [kafka] mjsax opened a new pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

mjsax opened a new pull request #10917:
URL: https://github.com/apache/kafka/pull/10917


   The fix to avoid spurious left/outer stream-stream join results, showed
   very low throughput for RocksDB, due to excessive creation of iterators.
   Instead of trying to emit left/outer stream-stream join result for every
   input record, this PR adds tracking of the lower timestamp bound of
   left/outer join candidates, and only tries to emit them (and create an
   iterator) if they are potentially old enough.
   
   Call for review @guozhangwang @spena @vcrfxia


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659525237



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Nice explanation! + 1 to reset to `Long.MAX_VALUE` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r660334734



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {

Review comment:
       It's a performance question. Without `joinBeforeMs` we only get 3K rec/sec throughput in our benchmarks, while we get 12K rec/sec throughput with `joinBeforeMs`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Well, updating `minTime` does not cost us anything because it' a plain in-memory operation (also no additional serde costs).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661900822



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -183,15 +195,27 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
+                return;
+            }
+            if (context.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {

Review comment:
       I think the result is still deterministic, as the result will always be the same records; only the output order might change, what is no issue IMHO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659525237



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Nice explanation! reset to `Long.MAX_VALUE` + 1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659908813



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Instead of trying to update minTime on each record, could we just set it once in line 207 below, plus setting it to MAX if we've exhausted all records (as @spena indicated above)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -69,6 +69,10 @@ public long get() {
         }
     }
 
+    static class MinTime {

Review comment:
       Could we merge this and MaxObservedStreamTime into a single class to be shared among operator nodes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r660334734



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {

Review comment:
       It's a performance question. Without `joinBeforeMs` we only get 3K rec/sec throughput in our benchmarks, while we get 12K rec/sec throughput with `joinBeforeMs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661100306



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -938,6 +938,9 @@
         // Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
+        public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "emit.interval.ms.kstreams.outer.join.spurious.results.fix__";

Review comment:
       Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10917:
URL: https://github.com/apache/kafka/pull/10917


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r660335277



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Well, updating `minTime` does not cost us anything because it' a plain in-memory operation (also no additional serde costs).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659908813



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Instead of trying to update minTime on each record, could we just set it once in line 207 below, plus setting it to MAX if we've exhausted all records (as @spena indicated above)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -69,6 +69,10 @@ public long get() {
         }
     }
 
+    static class MinTime {

Review comment:
       Could we merge this and MaxObservedStreamTime into a single class to be shared among operator nodes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r660844998



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {

Review comment:
       If we want to treat it as a performance/emit latency trade-off, then maybe better not piggy-back on `beforeMs` but certain value that's irrelevant to the window size?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661027703



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {

Review comment:
       That a good point, and I was concerned about it, too. Using `beforeMs` might increase the latency to much, especially for large windows.
   
   I might actually be best, to just to processing-time periodic emits, to avoid calling `store.all()` too oftern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r659521269



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -460,23 +463,25 @@ public void testOrdering() {
             // push two items to the primary stream; the other window is empty; this should not produce any item yet
             // w1 = {}
             // w2 = {}
-            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+            // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 101), 1:A2 (ts: 200) }
             // --> w2 = {}
             inputTopic1.pipeInput(0, "A0", 0L);
-            inputTopic1.pipeInput(1, "A1", 100L);
+            inputTopic1.pipeInput(1, "A1", 101L);
+            inputTopic1.pipeInput(1, "A2", 200L);
             processor.checkAndClearProcessResult();
 
             // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
             // the joined records
             // by the time they were produced before
-            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
+            // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 101), 1:A2 (ts: (200: }

Review comment:
       typo: `1:A2 (ts: 200)`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       Nice explanation! rest to `Long.MAX_VALUE` + 1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661064592



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -938,6 +938,9 @@
         // Private API used to disable the fix on left/outer joins (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__enable.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
+        public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "emit.interval.ms.kstreams.outer.join.spurious.results.fix__";

Review comment:
       Should we add underscore in front of the config like the above ones? i.e. `__emit.interval.ms.kstreams.outer.join.spurious.results.fix__`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661642934



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -183,15 +195,27 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
+                return;
+            }
+            if (context.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {

Review comment:
       Could you add a comment why we're using the system time instead of stream time to emit the results? I think I know why, but it would be good to have it here for future understanding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661055137



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -183,15 +195,27 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {

Review comment:
       nit: add some explanations on these two conditions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r658928595



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {

Review comment:
       Why adding both joinAfterMs and joinBeforeMs? The records expire when `window().start() + joinAfterMs + joinGraceMs` are lower than maxStreamTime. For instance, say we have a record in the shared state store with time = 1. Now a new record arrives with time = 17.
   
   ```
   inputRecordTime = 17
   maxObservedTime = 17
   minTime = 1
   window = 10  (beforeMs = 10, afterMs = 10)
   grace = 5
   ```
   
   Isn't the record 1 suppose to expire and be emitted because 1 + 10 (afterMs) + 5 (grace) = 16? which is lower than maxStreamTime?
   
   With the condition you have, the minTime registered is 1, so `(1 >= 17 - 10 - 10 - 5)` is true, and thus it returns and do not emit the record 1 until another 10 ms has passed.
   

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -186,12 +190,17 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (minTime.minTime >= maxObservedStreamTime.get() - joinAfterMs - joinBeforeMs - joinGraceMs) {
+                return;
+            }
+
             try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
                 while (it.hasNext()) {
                     final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> record = it.next();
 
                     final Windowed<KeyAndJoinSide<K>> windowedKey = record.key;
                     final LeftOrRightValue value = record.value;
+                    minTime.minTime = windowedKey.window().start();

Review comment:
       There's one extra thing to do. We should set `minTime = MAX` before opening the `store.all()` so it resets the minimum in case there are no records available in the iterator. This is an example I run:
   
   I have a few records in the shared state store (1,5,7). Then a new record arrives that expire all the 3 records. Record 50 for instance. For each record, the minTime will be set to 1, then 5, then 7. 
   
   Now for every new record after 50 that is still part of the window, the condition at the beginning of this method `minTime > maxStreamTime - ...` will be false, thus opening the iterator again and again.
   
   If we reset the minTime to MAX, then the next time, the iterator will be opened, but no records will be available, so minTime will stay in MAX. And the future records that do not expire will not open the iterator because `minTime (MAX) >= maxObservedTime - ...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661606393



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -96,11 +98,20 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte
 
             if (enableSpuriousResultFix
                 && StreamsConfig.InternalConfig.getBoolean(
-                    context().appConfigs(),
+                    context.appConfigs(),
                     ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
                     true
                 )) {
                 outerJoinWindowStore = outerJoinWindowName.map(context::getStateStore);
+                sharedTimeTracker.nextTimeToEmit = context.currentSystemTimeMs();
+
+                final Object emitIntervalConfig = context.appConfigs()
+                    .get(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+                if (emitIntervalConfig instanceof Number) {
+                    sharedTimeTracker.setEmitInterval(((Number) emitIntervalConfig).longValue());
+                } else if (emitIntervalConfig instanceof String) {
+                    sharedTimeTracker.setEmitInterval(Long.parseLong((String) emitIntervalConfig));
+                }

Review comment:
       Shouldn't be good to move this code inside `StreamsConfig.InternalConfig`? I did that for the `getBoolean` so I could re-use it in other places. This is a good candidate for internal configs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10917: KAFKA-10847: improve throughput of stream-stream join with spurious left/outer join fix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10917:
URL: https://github.com/apache/kafka/pull/10917#discussion_r661715122



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -183,15 +195,27 @@ public void process(final K key, final V1 value) {
 
         @SuppressWarnings("unchecked")
         private void emitNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store) {
+            if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinAfterMs - joinGraceMs) {
+                return;
+            }
+            if (context.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {

Review comment:
       Btw, this won't get deterministic results, right? If I re-run the join from the beginning, then I might get the expired records only after 50ms of my time has passed, not by the stream time. Is that desirable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org