You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/01 18:43:36 UTC

[GitHub] [kafka] spena opened a new pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

spena opened a new pull request #10462:
URL: https://github.com/apache/kafka/pull/10462


   Fixes the issue with https://issues.apache.org/jira/browse/KAFKA-10847.
   
   Note:
   - DO NOT review commit #1. This commit is being reviewed in https://github.com/apache/kafka/pull/10331
   
   To fix the above problem, the left/outer stream-stream join processor uses a buffer to hold non-joined records for some time until the window closes, so they are not processed if a join is found during the join window time. If the window of a record closes and a join was not found, then this should be emitted and processed by the consequent topology processor.
   
   A new time-ordered window store is used to temporary hold records that do not have a join and keep the records keys ordered by time. The `KStreamStreamJoin` has a reference to this new store . For every non-joined record seen, the processor writes it to this new state store without processing it. When a joined record is seen, the processor deletes the joined record from the new state store to prevent further processing. 
   
   Records that were never joined at the end of the window + grace period are emitted to the next topology processor. I use the stream time to check for the expiry time for determinism results . The `KStreamStreamJoin` checks for expired records and emit them every time a new record is processed in the join processor.
   
   The new state store is shared with the left and right join nodes. The new store needs to serialize the record keys using a combined key of `<joinSide-recordKey>`. This key combination helps to delete the records from the other join if a joined record is found. Two new serdes are created for this, `KeyAndJoinSideSerde` which serializes a boolean value that specifies the side where the key is found, and `ValueOrOtherValueSerde` that serializes either V1 or V2 based on where the key was found. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614537097



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       Yes, it a bug in the current implementation...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609948744



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Two comments:
   - even if shared between two processors, both would be executed in the same thread and thus it seems not be necessary to use `AtomicLong`
   - do we need to maintain it manually? could we use `contect.streamTime()` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915606



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614303123



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next();
+
+                    // Skip next records if the emit condition is false
+                    if (!emitCondition.test(e.key)) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+                    final To timestamp = To.all().withTimestamp(e.key.window().start());
+
+                    final R nullJoinedValue;
+                    if (isLeftJoin) {

Review comment:
       Yes. The `ReverseJoiner` does not work well in the shared store scenario. I need cast the value and otherValue based on what join side we are. 
   
   For instance, I would get an error if I remove the `(V1)` and `(V2)` casts. When using the casts, now I need to specify which value is the right one. In the left-side is `(V1) record.value.getLeftValue()`, in the right-side is `(V2) record.value.getLeftValue()`.
   ```
   nullJoinedValue = joiner.apply(key,
                           (V1) record.value.getLeftValue(),
                           (V2) record.value.getRightValue());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610893784



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -161,7 +181,7 @@ public void process(final K key, final V1 value) {
                     //
                     // the condition below allows us to process the late record without the need
                     // to hold it in the temporary outer store
-                    if (timeTo < maxStreamTime) {
+                    if (internalOuterJoinFixDisabled || timeTo < maxStreamTime) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610044912



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       I think `context.currentStreamTimeMs()` should work. I wasn't awrare I could get the new stream time from it. I don't see any problems as I only need the stream time to expire the records. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608846573



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;

Review comment:
       Could we an assertion that at most one of these two fields should be not null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891253



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +45,32 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final AtomicLong maxObservedStreamTime;
+    private final boolean thisJoin;

Review comment:
       Are you suggesting using two bool variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r622657322



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {

Review comment:
       @spena seems there are a few different conditions we can consider here:
   
   1) record time < stream time - window length - grace length: the record is too late, we should drop it up front and also record the `droppedRecordsSensorOrExpiredWindowRecordDropSensor`.
   
   2) record time >= stream time - window length - grace length, but < stream time: the record is still late, but joinable, since the stream time would not be advanced we would not have to check and emit non-joined records, but just try to join this record with the other window. Note that like @mjsax said, for the returned matching record, we also need to check if the other record time >= stream time - window length - grace length or not.
   
   3) record time > stream time, we would first try to emit non-joined records, and then try to join this record.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+

Review comment:
       @spena @mjsax I left another comment below regarding the time. Please LMK id you think that makes sense.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       @spena just ping to make sure you get this on the follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616268402



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       Seems this comment was not address yet. Or do you not want to add this additional fix into this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890964



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                   final JoinWindows windows,
+                                                                                                                                   final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),

Review comment:
       Are you talking about the ValueOrOtherValueSerde -> LeftOrRightValueSerde? If so, then Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616266276



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        if (leftValue != null && rightValue != null) {

Review comment:
       We do filter both `null` keys and `null` values. For `null` keys, because we cannot really compute the join, and for `null` value because a `null` would be a delete in RocksDB and thus we cannot store `null` value to begin with.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610754025



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is emitted
+                    store.put(e.key.key(), null, e.key.window().start());

Review comment:
       I'm not sure if this is possible. If I put or delete a future record that the iterator hasn't seen, then the iterator will reflect that change if I continue iterating until the deleted or new record appears. This is how Rocksdb iterator works, isn't it? When asking the next record, it only reads one record from the store. If subsequent records are deleted or are new, then the iterator will read the update in the next next() call.
   
   Or what test case do you have in mind? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610209134



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       I personally was on the side of always using task stream time everywhere but more people feel that we should use processor stream time :P Anyways, all I'm trying to say is that we need to make an educated decision here, and if we concluded that either 1) we rely on task time here, but still use processor time on other expiration logic, or 2) we rely on processor time on all logic, or 3) we rely on task time on all logic, we have a good rationale for whichever we choose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608833831



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;

Review comment:
       nit: instead of using this / other, maybe we can use left/right to be more aligned with join terminologies.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;
+
+        return ByteBuffer

Review comment:
       We had some benchmarks before that reveals generally `System.arraycopy(data, 1, newBytes, 0, rawValueLength); ` has better performance than `ByteBuffer`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;

Review comment:
       Could we an assertion that exactly one of these two fields should be not null?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;
+        this.otherValue = otherValue;
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V1 value as {@code thisValue} and
+     * V2 value as null.
+     *
+     * @param thisValue the V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeValue(final V1 thisValue) {
+        return new ValueOrOtherValue<>(thisValue, null);
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V2 value as {@code otherValue} and
+     * V1 value as null.
+     *
+     * @param otherValue the V2 value
+     * @param <V2>       the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeOtherValue(final V2 otherValue) {
+        return new ValueOrOtherValue<>(null, otherValue);
+    }
+
+    public V1 getThisValue() {
+        return thisValue;
+    }
+
+    public V2 getOtherValue() {
+        return otherValue;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisValue + "," + otherValue + ">";

Review comment:
       Ditto: better print as `left/right: value`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;
+
+    private KeyAndJoinSide(final boolean thisJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");
+        this.thisJoin = thisJoin;
+    }
+
+    /**
+     * Create a new {@link KeyAndJoinSide} instance if the provide {@code key} is not {@code null}.
+     *
+     * @param thisJoin True if the key is part of the left topic (reference as thisJoin in {@code KStreamImplJoin})
+     * @param key      the key
+     * @param <K>      the type of the key
+     * @return a new {@link KeyAndJoinSide} instance if the provide {@code key} is not {@code null}
+     */
+    public static <K> KeyAndJoinSide<K> make(final boolean thisJoin, final K key) {
+        return new KeyAndJoinSide<>(thisJoin, key);
+    }
+
+    public boolean isThisJoin() {
+        return thisJoin;
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisJoin + "," + key + ">";

Review comment:
       If we go with `left / right`, then we can update here as well to print left/right instead of `true/false`, i.e.: `<left/right -> key>`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueDeserializer.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class ValueOrOtherValueDeserializer<V1, V2> implements Deserializer<ValueOrOtherValue<V1, V2>> {
+    public final Deserializer<V1> thisDeserializer;
+    public final Deserializer<V2> otherDeserializer;
+
+    public ValueOrOtherValueDeserializer(final Deserializer<V1> thisDeserializer, final Deserializer<V2> otherDeserializer) {
+        this.thisDeserializer = Objects.requireNonNull(thisDeserializer);
+        this.otherDeserializer = Objects.requireNonNull(otherDeserializer);
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs,
+                          final boolean isKey) {
+        thisDeserializer.configure(configs, isKey);
+        otherDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueOrOtherValue<V1, V2> deserialize(final String topic, final byte[] joinedValues) {
+        if (joinedValues == null || joinedValues.length == 0) {
+            return null;
+        }
+
+        final boolean thisJoin = joinedValues[0] == 1 ? true : false;
+        return thisJoin
+            ? ValueOrOtherValue.makeValue(thisDeserializer.deserialize(topic, rawValue(joinedValues)))
+            : ValueOrOtherValue.makeOtherValue(otherDeserializer.deserialize(topic, rawValue(joinedValues)));
+    }
+
+    static byte[] rawValue(final byte[] joinedValues) {
+        final int rawValueLength = joinedValues.length - 1;
+
+        return ByteBuffer

Review comment:
       Ditto: use array copy, since ByteBuffer#put uses a while-loop of `public abstract ByteBuffer put(byte b)` which is less efficient.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {

Review comment:
       Ditto: use `left / right value` terminology, i.e. maybe we can rename this class as `LeftOrRightValue`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueSerializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serializes a {@link ValueOrOtherValue}. The serialized bytes starts with a byte that references
+ * to whether the value is V1 or V2.
+ */
+public class ValueOrOtherValueSerializer<V1, V2> implements Serializer<ValueOrOtherValue<V1, V2>> {
+    private final Serializer<V1> thisSerializer;
+    private final Serializer<V2> otherSerializer;
+
+    public ValueOrOtherValueSerializer(final Serializer<V1> thisSerializer, final Serializer<V2> otherSerializer) {
+        this.thisSerializer = Objects.requireNonNull(thisSerializer);
+        this.otherSerializer = Objects.requireNonNull(otherSerializer);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueOrOtherValue<V1, V2> data) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawThisValue = (data.getThisValue() != null) ? thisSerializer.serialize(topic, data.getThisValue()) : null;

Review comment:
       We can simplify this a bit, since we know only one field would be not null; we can just declare a single `rawValue` here, and then the lines 46-54 can also be simplified.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;

Review comment:
       nit: rawKeyLength?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610113314



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Agreed, but I am wondering if we should/want to switch to the new `context.streamTime()` everywhere? -- It's totally open question, but might be worth to explore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608887445



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -161,7 +181,7 @@ public void process(final K key, final V1 value) {
                     //
                     // the condition below allows us to process the late record without the need
                     // to hold it in the temporary outer store
-                    if (timeTo < maxStreamTime) {
+                    if (internalOuterJoinFixDisabled || timeTo < maxStreamTime) {

Review comment:
       We can refactor it as:
   
   ```
   if (!outerJoinWindowStore.isPresent() || timeTo < maxStreamTime) {
       context().forward(key, joiner.apply(key, value, null));
   } else {
   
   }
   ```
   
   Then `internalOuterJoinFixDisabled` can just be a local variable instead of a class field.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612589665



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
+
+            outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       It's extremely subtle, but we cannot use `context.streamTime()` because of the time-delay effects of upstream record caches. This was the cause of a severe bug in `suppress` that went undetected until after it was released.
   
   For example: if we have a record cache upstream of this join, it will delay the propogation of records (and their accompanying timestamps) by time amount `D`. Say we ingest some record with timestamp `T`. If we reference the context's stream time, our processor will think it is at time `T`, when it is really at time `T - D`, leading it to behave wrongly, such as enforcing the grace period prematurely, which will manifest to users as data loss.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611882455



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {

Review comment:
       This needs to be supported in Window stores. I might need to write a KIP for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891590



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
+            // maxObservedStreamTime is updated and shared between left and right sides, so we can
+            // process a non-join record immediately if it is late
+            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616762075



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {

Review comment:
       I did not find how to get the configs from KStreamImplJoin. I tracked the configs being passed from the StreamTask down to the processor when processing the record, so I ended up checking the flag at that point. I could refactor the code to pass the configs when constructing the joins, but that require more changes in different places which I'm not sure if it will make things incompatible. Any ideas? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312924



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> configs) {
+            final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> configs) {
+            final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+            }
+
+            if (value instanceof Boolean) {
+                return (Boolean) value;
+            } else {

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538486



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -486,6 +486,327 @@ public void testJoin() {
         }
     }
 
+    @Test
+    public void testImprovedLeftJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will not emit records
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will not emit records
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testImprovedFullOuterJoin() {

Review comment:
       I am just realizing, that we don't have a `KStreamKStreamOuterJoinTest` class -- we should add one and move this test there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610054139



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       We had similar debate for using per-task stream time v.s. per-processor stream time when it comes down to expiration. I'm concerned if we go with task stream time here we would have an inconsistent behavior compared with other scenarios?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608905403



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       cc @mjsax as well, LMK WDYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10462:
URL: https://github.com/apache/kafka/pull/10462


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915696



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612827412



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean leftJoin;

Review comment:
       nit: the term `leftJoin` may be a bit confusing as it could mean the join type; maybe just call it `leftSide` is better? Ditto for the fields in LeftOrRightValue, and also `isLeftJoin` -> `isLeftSide` in the impl join classes.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        this.leftValue = leftValue;

Review comment:
       I still feel it is better to add a check here, to make sure at most one of the left/rightValue is not null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616266276



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        if (leftValue != null && rightValue != null) {

Review comment:
       We do filter both `null` keys and `null` values. For `null` keys, because we cannot really compute the join, and for `null` value because a `null` would be a delete in RocksDB and thus we cannot store `null` value to begin with.
   
   Thus, it's not possible that both values are `null`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610006224



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       Just to be more concrete here, I think we can move the expiration out of the loop https://github.com/apache/kafka/pull/10462/files#diff-6ca18143cc0226e6d1e4d5180ff81596a72d53639ca5184bf1238350265382a6R154 for fetching any records to join, based on the above analysis.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609948744



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Do we need to maintain it manually? Could we use `context.streamTime()` instead?
   
   Note that `context.streamTime()` might be slightly different because we advance it for every input record. Thus, if there is a filter before the join, the join might not get all records and thus it's locally observed stream-time could differ from the task stream-time.
   
   It's a smaller semantic impact/difference and it's unclear to me, if we should prefer processor-local stream-time or task stream-time?
   
   \cc @guozhangwang @vvcephei 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610756052



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is emitted
+                    store.put(e.key.key(), null, e.key.window().start());

Review comment:
       Sorry it's my bad, I mis read the code and thought you were deleting records while inside the
   
   ```
   try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
   ```
   
   block. Please ignore this comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616218643



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =

Review comment:
       nit : I think this function can just be inlined now?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = leftOuter ? "-shared-left-outer-join" : "-shared-outer-join";

Review comment:
       I think @mjsax 's comment about naming is just to align with the join type, and note that "leftOuter" would always be true once we passed `if (leftOuter || rightOuter)` since "leftOuter == false && rightOuter == true" is not a case. Also to align with the terms "-outer-this-join" / "-this-join", I think it would be:
   
   rightOuter ? "-outer-shared-join" : "-left-shared-join";

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        if (leftValue != null && rightValue != null) {

Review comment:
       Hmm.. I thought it could be possible that both sides are null? e.g. for a left-value, where the value itself is `null` (we do not filter null values in a stream at the moment right? @mjsax ).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {

Review comment:
       I think the motivation is that if the current record's timestamp is too small (i.e. it is too late), then it should not be added into the book-keeping store but can be "expired" immediately. But I also feel the condition seems a bit off here: for the record to be "too late", its timestamp just need to be smaller than the expiration boundary, which is observed-stream-time - join-after - grace-period, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (internalOuterJoinFixEnabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixEnabled(final Map<String, Object> configs) {

Review comment:
       See my other comment above: I think we should move this to the KStreamImplJoin to decide whether or not creating the store builder, and here we can just rely on whether the passed in outerJoinWindowName is empty or not to get the store (if it is not empty, then the store must have been created).

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {

Review comment:
       If the `INTERNAL_ENABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX` config is set to false, we would end up creating a store but not use it at all.
   
   So I think we need to also check this config here during the plan generation as well: if it is false, then leaving the storeBuilder as empty optional.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);

Review comment:
       nit: not introduced by this PR, but let's rename it to `otherWindowStore` for naming consistency.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamOuterJoinTest {

Review comment:
       Thanks for the added tests! I'm wondering if we can add a case for both left/outer where input streams have consecutive records with exact key / timestamps --- i.e. they are "duplicates" --- and check that 1) we do not de-duplicate them, 2) when a join is not found in time, we expire / delete them together, with two emitted results, 3) when a join is found in time, we emitted two join results.
   
   Also could we have a test case that, when a single record both causes expiration and join results, the emitted records are still sorted in order.
   
   I raised them only because I have yet found these two cases in the test cases, if there's already coverage please ignore this comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616759894



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockProcessor;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamOuterJoinTest {

Review comment:
       Most of them were already there. I just updated them to add the 3) scenario. 
   ```
   KStreamKStreamLeftJoinTest
   - testLeftJoinDuplicates()
   - testLeftJoinDuplicatesWithFixDisabled()
   - testOrdering()
   
   KStreamKStreamOuterJoinTest
   - testOuterJoinDuplicates()
   - testOuterJoinDuplicatesWithFixDisabled()
   - testOrdering()
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612591727



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
+
+            outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       @mjsax is correct that there is a bug that the processor-local stream time gets reset on rebalance/restart. It would be good to fix it, but with the current architecture, the only correct solution is to persist the processor-local stream time. Another approach we've discussed is to remove the time-delay effect of the record cache.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608860297



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                   final JoinWindows windows,
+                                                                                                                                   final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes public.
+    private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName,
+                                                                             final Duration retentionPeriod,
+                                                                             final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the window store "
+                + storeName + " must be no smaller than its window size. Got size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       Hm.. why we do not want to retain duplicates?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);

Review comment:
       Using `newProcessorName` would bump up the suffix index inside the builder by one, and hence all downstream processor names / store name suffixes are shifted by one, which would break compatibility. I'd suggest we do not use that function to generate name in any case, instead:
   
   1) if `userProvidedBaseStoreName` is provided, then use `userProvidedBaseStoreName + outerJoinSuffix`;
   2) otherwise, we piggy-back on the suffix index of the `joinThisGeneratedName`. E.g. if `joinThisGeneratedName` is `KSTREAM-OUTERTHIS-0000X` then this store name is `KSTREAM-OUTERSHARED-0000X` as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {

Review comment:
       Not a comment to be addressed for this PR, but for future optimizations right after this PR: currently we are likely to trigger this function every time, assuming stream time would advance most of the time --- this is the case in production --- while inside the `maybeEmitOuterExpiryRecords`, inside which we would consider grace period. As a result we may invoke rocksDB many times unnecessarily only to find condition 198 is satisfied immediately.
   
   A possible heuristic is that, in line 198 below, before we break we remember the difference as `previousObservedExpirationGap = e.key.window().end() + joinGraceMs - maxStreamTime`. And also we remember the previous `maxStreamTime` when last `maybeEmitOuterExpiryRecords` is triggered. Then here we would only trigger the function if `maxStreamTime - previousMaxStreamTimeWhenEmitTriggered >= previousObservedExpirationGap`. In such ways we would trigger this function and hence search in rocksDB's starting position much less.
   
   cc @mjsax @vvcephei  WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is emitted

Review comment:
       Typo on comment.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                   final JoinWindows windows,
+                                                                                                                                   final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),

Review comment:
       Note that we used `this/other` because we have two joiners / join stores so the relation is vice-versa: from left side's point of view, V1 is this and V2 is other, from right side's point of view, V2 is this and V2 is other.
   
   However here we only have one extra store, so we can just name them as left and right.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +45,32 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final AtomicLong maxObservedStreamTime;
+    private final boolean thisJoin;

Review comment:
       Ditto here, we can rename it to leftJoin / rightJoin to indicate if this joiner is for left or right.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -92,6 +113,10 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
+            // maxObservedStreamTime is updated and shared between left and right sides, so we can
+            // process a non-join record immediately if it is late
+            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));

Review comment:
       nit: we can move `inputRecordTimestamp` up and use it here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {

Review comment:
       Here if we refactor to `left / right` then this logic can be simplified as well since we would only care whether the deserialized key/value are left or right.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
+
+            outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       Since this is only accessed single-thread, using an atomic long feels a bit overkill. We could probably maintain the "long maxObservedStreamTime" in this class, and pass in a `ObservedStreamTime` interface to the two joiners which just have a setter / getter to read and write to the local variable.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {

Review comment:
       I think we can move this logic into ValueOrOtherValue as another static constructor.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is emitted
+                    store.put(e.key.key(), null, e.key.window().start());

Review comment:
       This reminds me about the test coverage: maybe we should also test that store.put / delete can be triggered while the iterator is open, and if the put / deleted elements would not be reflected from the iterator.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {

Review comment:
       Before further optimization, we can use `store.putIfAbsent` for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538557



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -486,6 +486,327 @@ public void testJoin() {
         }
     }
 
+    @Test
+    public void testImprovedLeftJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will not emit records
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will not emit records
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testImprovedFullOuterJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the right topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the right topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testOuterJoinEmitsNonJoinedRecordsAfterWindowCloses() {

Review comment:
       As above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891690



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {
+                            store.put(otherJoinKey, null, otherRecordTimestamp);
+                        }
+                    });
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (timeTo < maxStreamTime) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(thisJoin, key),
+                            makeValueOrOtherValue(thisJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+
+                outerJoinWindowStore.ifPresent(store -> {
+                    // only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime)
+                    // if the current record is late, then there is no need to check for expired records
+                    if (inputRecordTimestamp == maxStreamTime) {
+                        maybeEmitOuterExpiryRecords(store, maxStreamTime);
+                    }
+                });
+            }
+        }
+
+        private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) {
+            return thisJoin
+                ? ValueOrOtherValue.makeValue(value)
+                : ValueOrOtherValue.makeOtherValue(value);
+        }
+
+        @SuppressWarnings("unchecked")
+        private void maybeEmitOuterExpiryRecords(final WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue> store, final long maxStreamTime) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, ValueOrOtherValue> e = it.next();
+
+                    // Skip next records if the oldest record has not expired yet
+                    if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+
+                    // Emit the record by joining with a null value. But the order varies depending whether
+                    // this join is using a reverse joiner or not. Also whether the returned record from the
+                    // outer window store is a V1 or V2 value.
+                    if (thisJoin) {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getThisValue(), null));
+                        } else {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getOtherValue()));
+                        }
+                    } else {
+                        if (e.key.key().isThisJoin()) {
+                            context().forward(key, joiner.apply(key, null, (V2) e.value.getThisValue()));
+                        } else {
+                            context().forward(key, joiner.apply(key, (V1) e.value.getOtherValue(), null));
+                        }
+                    }
+
+                    // Delete the key from tne outer window store now it is emitted

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612094560



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
       Should we use `-shared-left-join-store` and `-shared-right-join-store` to left/outer join?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                  final JoinWindows windows,
+                                                                                                                                  final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM

Review comment:
       Should we pass a `Time` reference here to allow us to mock time in tests if necesssary?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
       Seem overkill to introduce this one? It's used just ones to maybe just return `false` where it's used directly?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {

Review comment:
       Not sure if I understand the second condition?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       I agree (even if your train of though seems to be complex...) that we can expired left/outer join result outside of the loop blindly.
   
   To rephrase: if we process record with timestamp T there are two cases:
   
   In-order record:
   We can expire everything with timestamp smaller then `T - windowSize - gracePeriod`.
   
   Out-of-order record:
   There is nothing to expired to begin with, because we could only expire `T - windowSize - gracePeriod` but this value is smaller than `streamTime - windowSize - gracePeriod` and thus we expired everything up to the larger value perviously already.
   
   Ie, we don't need method `emitExpiredNonJoinedOuterRecordsExcept` at all, and can move `emitExpiredNonJoinedOuterRecords();` before the while loop.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+
+            // Get the suffix index of the joinThisGeneratedName to build the outer join store name.
+            final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
+                + joinThisGeneratedName.substring(
+                    rightOuter
+                        ? KStreamImpl.OUTERTHIS_NAME.length()
+                        : KStreamImpl.JOINTHIS_NAME.length());

Review comment:
       Not sure if I understand? Why use "outer" or " this" here? If the store is shared, neither one seems to make sense? Overall naming of processor and stores is tricky.. Can we actually add a corresponding test that compares generated and expected `TopologyDescription` for this case?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> configs) {
+            final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
       Why do we interpret "not set" as disabled? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> configs) {
+            final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;
+            }
+
+            if (value instanceof Boolean) {
+                return (Boolean) value;
+            } else {

Review comment:
       Should we have another `instancof String` and an `else throw` with informative error message?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+

Review comment:
       Side improvement: I think we should skip late record directly and also record it in `TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor` 

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {

Review comment:
       Instead of `!disabled` should we reverse it and use `if(internalOuterJoinFixEnabled(...))` ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,

Review comment:
       The method name is confusing. We are building the _shared_ store (compared to "this" and "other") that we use to refer to the two "main" stores of the left and right input processors.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need

Review comment:
       `late` -> `out-of-order` -- if it's _late_ it would be _after_ the grace period and would be dropped.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next();

Review comment:
       `e` is not a good variable name

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {

Review comment:
       nit: line to long
   
   should be
   ```
   private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store,
                                                 final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       Side improvement: atm `windowStore` does not have a guarantee of a strict retention time, ie, even if retention time passed, it may still have expired data in it, and return it. Thus, we should have an additional check if `otherRecordTimestamp < stream-time - windowSize - gracePeriod` and drop the "other record" for this case, to get a strict time bound (we don't need to report this in any metric).
   
   We should extend our tests accordingly.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        this.leftValue = leftValue;
+        this.rightValue = rightValue;
+    }
+
+    /**
+     * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and
+     * V2 value as null.
+     *
+     * @param leftValue the left V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link LeftOrRightValue} instance
+     */
+    public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 leftValue) {
+        Objects.requireNonNull(leftValue, "leftValue is null");

Review comment:
       nit: `"leftValue cannot be null"` (similar below)

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next();
+
+                    // Skip next records if the emit condition is false
+                    if (!emitCondition.test(e.key)) {
+                        break;
+                    }
+
+                    final K key = e.key.key().getKey();
+                    final To timestamp = To.all().withTimestamp(e.key.window().start());
+
+                    final R nullJoinedValue;
+                    if (isLeftJoin) {

Review comment:
       Do we need this (as we pass in a `ReversJoiner` in the "other" join processor anyway) ?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
+
+            outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       One disadvantage compared to using `context.streamTime()` would be, that `MaxObservedStreamTime` would be reset to zero on rebalance/restart. (Or we need to add additional code to preserve it...)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean leftJoin;
+
+    private KeyAndJoinSide(final boolean leftJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");

Review comment:
       nit: `"key cannot be null"`

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                   final JoinWindows windows,
+                                                                                                                                   final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes public.
+    private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName,
+                                                                             final Duration retentionPeriod,
+                                                                             final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the window store "
+                + storeName + " must be no smaller than its window size. Got size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       For existing windowed-stores with duplicates enables, we never call `delete()` but only rely on retention based deletion -- thus, it totally possible that is was never supported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614312224



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                  final JoinWindows windows,
+                                                                                                                                  final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM

Review comment:
       It will require more changes just to allow that. The `KStreamImplJoin` constructor, where we could overload to pass a `Time` mock object, is only used by the `KStreamImplJoinImpl` class. The tests use the `StreamsBuilder` to create the joins, and they do not accept a Time object. 
   
   Also, the `Stores` class, which is called by `KStreamImplJoin`, does not mock it. Maybe because the same code changes required just for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609954693



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       I guess that's possible, but _if_ the join result is large, we could run into memory issue buffering all join results?
   
   Also, sorting could be expensive and we can actually avoid it, and still guarantee that results are emitted in timestamp order:
   - we know that left/outer join result would have the smallest timestamps and thus we can emit those first (given that we use timestamped-sorted store anyway, we just scan the store from old to new and emit
   - for the inner join result, we get the output sorted by timestamp, too, because for the join key, data is sorted in timestamp order in the store, too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538200



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -486,6 +486,327 @@ public void testJoin() {
         }
     }
 
+    @Test
+    public void testImprovedLeftJoin() {

Review comment:
       Should this test not be added to `KStreamKStreamLeftJoinTest.java` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609948744



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Do we need to maintain it manually? Could we use `contect.streamTime()` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614105673



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+
+            // Get the suffix index of the joinThisGeneratedName to build the outer join store name.
+            final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
+                + joinThisGeneratedName.substring(
+                    rightOuter
+                        ? KStreamImpl.OUTERTHIS_NAME.length()
+                        : KStreamImpl.JOINTHIS_NAME.length());

Review comment:
       I initially generated a name with a new index for the shared store. However, seems this was going to cause incompatibilities in the topology because the new indexed increasing. Instead, now I just get the index from one of the current join stores. Why doesn't make sense? Is there another way to get an index? Or, do I really need to append an index at the end of the shared store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609948744



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Do we need to maintain it manually? Could we use `context.streamTime()` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614536735



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                  final JoinWindows windows,
+                                                                                                                                  final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM

Review comment:
       Ok. Maybe good enough as-is. (Fair point that we don't mock it in other stores either -- maybe there was never any demand to be able to mock it. As you said, we could change it as follow up if needed.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616758960



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +83,44 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614313362



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {
+                        context().forward(key, joiner.apply(key, value, null));
+                    } else {
+                        outerJoinWindowStore.ifPresent(store -> store.put(
+                            KeyAndJoinSide.make(isLeftJoin, key),
+                            LeftOrRightValue.make(isLeftJoin, value),
+                            inputRecordTimestamp));
+                    }
+                }
+            }
+        }
+
+        private void emitExpiredNonJoinedOuterRecords() {
+            outerJoinWindowStore.ifPresent(store ->
+                emitExpiredNonJoinedOuterRecords(store, recordWindowHasClosed));
+        }
+
+        private void emitExpiredNonJoinedOuterRecordsExcept(final K key, final long timestamp) {
+            outerJoinWindowStore.ifPresent(store -> {
+                final KeyAndJoinSide<K> keyAndJoinSide = KeyAndJoinSide.make(!isLeftJoin, key);
+
+                // Emit all expired records except the just found non-joined key. We need
+                // to emit all expired records before calling put(), otherwise the internal
+                // stream time will advance and may cause records out of the retention period to
+                // be deleted.
+                emitExpiredNonJoinedOuterRecords(store,
+                    recordWindowHasClosed
+                        .and(k -> !k.key().equals(keyAndJoinSide))
+                        .and(k -> k.window().start() != timestamp));
+
+                if (store.fetch(keyAndJoinSide, timestamp) != null) {
+                    // Delete the record. The previous emit call may not have removed this record
+                    // if the record window has not closed.
+                    store.put(keyAndJoinSide, null, timestamp);
+                }
+            });
+        }
+
+        @SuppressWarnings("unchecked")
+        private void emitExpiredNonJoinedOuterRecords(final WindowStore<KeyAndJoinSide<K>, LeftOrRightValue> store, final Predicate<Windowed<KeyAndJoinSide<K>>> emitCondition) {
+            try (final KeyValueIterator<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> it = store.all()) {
+                while (it.hasNext()) {
+                    final KeyValue<Windowed<KeyAndJoinSide<K>>, LeftOrRightValue> e = it.next();

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean leftJoin;
+
+    private KeyAndJoinSide(final boolean leftJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        this.leftValue = leftValue;
+        this.rightValue = rightValue;
+    }
+
+    /**
+     * Create a new {@link LeftOrRightValue} instance with the V1 value as {@code leftValue} and
+     * V2 value as null.
+     *
+     * @param leftValue the left V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link LeftOrRightValue} instance
+     */
+    public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1 leftValue) {
+        Objects.requireNonNull(leftValue, "leftValue is null");

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link LeftOrRightValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class LeftOrRightValue<V1, V2> {
+    private final V1 leftValue;
+    private final V2 rightValue;
+
+    private LeftOrRightValue(final V1 leftValue, final V2 rightValue) {
+        this.leftValue = leftValue;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean leftJoin;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
         return builder;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <K, V1, V2> StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> outerJoinWindowStoreBuilder(final String storeName,
+                                                                                                                                   final JoinWindows windows,
+                                                                                                                                   final StreamJoinedInternal<K, V1, V2> streamJoinedInternal) {
+        final StoreBuilder<WindowStore<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>> builder = new TimeOrderedWindowStoreBuilder<KeyAndJoinSide<K>, ValueOrOtherValue<V1, V2>>(
+            persistentTimeOrderedWindowStore(
+                storeName + "-store",
+                Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+                Duration.ofMillis(windows.size())
+            ),
+            new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+            new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()),
+            Time.SYSTEM
+        );
+        if (streamJoinedInternal.loggingEnabled()) {
+            builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        return builder;
+    }
+
+    // This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is
+    // a non-public API, so we need to keep duplicate code until it becomes public.
+    private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName,
+                                                                             final Duration retentionPeriod,
+                                                                             final Duration windowSize) {
+        Objects.requireNonNull(storeName, "name cannot be null");
+        final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+        final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
+        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
+        final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix);
+
+        final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+        if (retentionMs < 0L) {
+            throw new IllegalArgumentException("retentionPeriod cannot be negative");
+        }
+        if (windowSizeMs < 0L) {
+            throw new IllegalArgumentException("windowSize cannot be negative");
+        }
+        if (segmentInterval < 1L) {
+            throw new IllegalArgumentException("segmentInterval cannot be zero or negative");
+        }
+        if (windowSizeMs > retentionMs) {
+            throw new IllegalArgumentException("The retention period of the window store "
+                + storeName + " must be no smaller than its window size. Got size=["
+                + windowSizeMs + "], retention=[" + retentionMs + "]");
+        }
+
+        return new RocksDbWindowBytesStoreSupplier(
+            storeName,
+            retentionMs,
+            segmentInterval,
+            windowSizeMs,
+            false,

Review comment:
       I had issues with duplicates, and forgot to investigate about it. I just did another round of investigation, but I still get issues with it. The problem is I cannot delete any key when duplicates are used. This happens in any window store, not just the time-ordered window store.
   
   The problem I found is:
   
   1. Added two duplicates with key = 0 and time = 0
   ```
   # this adds a key with seqNum = 0
   put(0, "A0", 0) 
   # this adds a key with seqNum = 1
   put(0, "A0-0", 0)
   ```
   2. Delete key = 0 and time = 0
   ```
   # this attempts to delete with seqNum = 2, which it does not exist
   put(0, null, 0)
   ```
   
   Initially I didn't think using duplicates were necessary, but I just wrote a test case with the old semantics and duplicates are processed, so I need to support it. Do you know if deleting duplicates was unsupported all the time? or am I missing some API or workaround?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614098535



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
       There is only one store shared between left/right joins. I can use only one name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610890414



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;

Review comment:
       I changed to leftJoin. But I seems you suggested adding two bool variables, one for left and another for rigth?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSide.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Objects;
+
+/**
+ * Combines a key from a {@link KeyValue} with a boolean value referencing if the key is
+ * part of the left join (true) or right join (false). This class is only useful when a state
+ * store needs to be shared between left and right processors, and each processor needs to
+ * access the key of the other processor.
+ */
+public class KeyAndJoinSide<K> {
+    private final K key;
+    private final boolean thisJoin;
+
+    private KeyAndJoinSide(final boolean thisJoin, final K key) {
+        this.key = Objects.requireNonNull(key, "key is null");
+        this.thisJoin = thisJoin;
+    }
+
+    /**
+     * Create a new {@link KeyAndJoinSide} instance if the provide {@code key} is not {@code null}.
+     *
+     * @param thisJoin True if the key is part of the left topic (reference as thisJoin in {@code KStreamImplJoin})
+     * @param key      the key
+     * @param <K>      the type of the key
+     * @return a new {@link KeyAndJoinSide} instance if the provide {@code key} is not {@code null}
+     */
+    public static <K> KeyAndJoinSide<K> make(final boolean thisJoin, final K key) {
+        return new KeyAndJoinSide<>(thisJoin, key);
+    }
+
+    public boolean isThisJoin() {
+        return thisJoin;
+    }
+
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisJoin + "," + key + ">";

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;
+
+        return ByteBuffer

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class KeyAndJoinSideDeserializer<K> implements Deserializer<KeyAndJoinSide<K>> {
+    private final Deserializer<K> keyDeserializer;
+
+    KeyAndJoinSideDeserializer(final Deserializer<K> keyDeserializer) {
+        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "keyDeserializer is null");
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        keyDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public KeyAndJoinSide<K> deserialize(final String topic, final byte[] data) {
+        final boolean bool = data[0] == 1 ? true : false;
+        final K key = keyDeserializer.deserialize(topic, rawKey(data));
+
+        return KeyAndJoinSide.make(bool, key);
+    }
+
+    static byte[] rawKey(final byte[] data) {
+        final int rawValueLength = data.length - 1;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;
+        this.otherValue = otherValue;
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V1 value as {@code thisValue} and
+     * V2 value as null.
+     *
+     * @param thisValue the V1 value
+     * @param <V1>      the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeValue(final V1 thisValue) {
+        return new ValueOrOtherValue<>(thisValue, null);
+    }
+
+    /**
+     * Create a new {@link ValueOrOtherValue} instance with the V2 value as {@code otherValue} and
+     * V1 value as null.
+     *
+     * @param otherValue the V2 value
+     * @param <V2>       the type of the value
+     * @return a new {@link ValueOrOtherValue} instance
+     */
+    public static <V1, V2> ValueOrOtherValue<V1, V2> makeOtherValue(final V2 otherValue) {
+        return new ValueOrOtherValue<>(null, otherValue);
+    }
+
+    public V1 getThisValue() {
+        return thisValue;
+    }
+
+    public V2 getOtherValue() {
+        return otherValue;
+    }
+
+    @Override
+    public String toString() {
+        return "<" + thisValue + "," + otherValue + ">";

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueDeserializer.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+public class ValueOrOtherValueDeserializer<V1, V2> implements Deserializer<ValueOrOtherValue<V1, V2>> {
+    public final Deserializer<V1> thisDeserializer;
+    public final Deserializer<V2> otherDeserializer;
+
+    public ValueOrOtherValueDeserializer(final Deserializer<V1> thisDeserializer, final Deserializer<V2> otherDeserializer) {
+        this.thisDeserializer = Objects.requireNonNull(thisDeserializer);
+        this.otherDeserializer = Objects.requireNonNull(otherDeserializer);
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs,
+                          final boolean isKey) {
+        thisDeserializer.configure(configs, isKey);
+        otherDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueOrOtherValue<V1, V2> deserialize(final String topic, final byte[] joinedValues) {
+        if (joinedValues == null || joinedValues.length == 0) {
+            return null;
+        }
+
+        final boolean thisJoin = joinedValues[0] == 1 ? true : false;
+        return thisJoin
+            ? ValueOrOtherValue.makeValue(thisDeserializer.deserialize(topic, rawValue(joinedValues)))
+            : ValueOrOtherValue.makeOtherValue(otherDeserializer.deserialize(topic, rawValue(joinedValues)));
+    }
+
+    static byte[] rawValue(final byte[] joinedValues) {
+        final int rawValueLength = joinedValues.length - 1;
+
+        return ByteBuffer

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValue.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * This class is used in combination of {@link KeyAndJoinSide}. The {@link KeyAndJoinSide} class
+ * combines a key with a boolean value that specifies if the key is found in the left side of a
+ * join or on the right side. This {@link ValueOrOtherValue} object contains either the V1 value,
+ * which is found in the left topic, or V2 value if it is found in the right topic.
+ */
+public class ValueOrOtherValue<V1, V2> {
+    private final V1 thisValue;
+    private final V2 otherValue;
+
+    private ValueOrOtherValue(final V1 thisValue, final V2 otherValue) {
+        this.thisValue = thisValue;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ValueOrOtherValueSerializer.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/**
+ * Serializes a {@link ValueOrOtherValue}. The serialized bytes starts with a byte that references
+ * to whether the value is V1 or V2.
+ */
+public class ValueOrOtherValueSerializer<V1, V2> implements Serializer<ValueOrOtherValue<V1, V2>> {
+    private final Serializer<V1> thisSerializer;
+    private final Serializer<V2> otherSerializer;
+
+    public ValueOrOtherValueSerializer(final Serializer<V1> thisSerializer, final Serializer<V2> otherSerializer) {
+        this.thisSerializer = Objects.requireNonNull(thisSerializer);
+        this.otherSerializer = Objects.requireNonNull(otherSerializer);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueOrOtherValue<V1, V2> data) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawThisValue = (data.getThisValue() != null) ? thisSerializer.serialize(topic, data.getThisValue()) : null;

Review comment:
       Done

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614534849



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
       Sorry for the confusion. What I mean was, if it's in `leftJoin()` we could use `-shared-left-join-store` and if it's an `outerJoin()` we could use `-shared-outer-join-store` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612773044



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+
+                    outerJoinWindowStore.ifPresent(store -> {
+                        // Delete the other joined key from the outer non-joined store now to prevent
+                        // further processing
+                        final KeyAndJoinSide<K> otherJoinKey = KeyAndJoinSide.make(!thisJoin, key);
+                        if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) {

Review comment:
       Ah nvm then --- let's just keep it out of the scope of this ticket for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614536011



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+
+            // Get the suffix index of the joinThisGeneratedName to build the outer join store name.
+            final String outerJoinStoreGeneratedName = KStreamImpl.OUTERSHARED_NAME
+                + joinThisGeneratedName.substring(
+                    rightOuter
+                        ? KStreamImpl.OUTERTHIS_NAME.length()
+                        : KStreamImpl.JOINTHIS_NAME.length());

Review comment:
       I agree that we should not use one more index to avoid compatibility issues... Maybe the question is really (just for my better understanding), what would the name be, ie, could be give a concrete example (with and without `Named` parameter)? That is also why I asked for a test using `TopologyDescription` -- makes it easier to wrap my head around.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614304330



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;

Review comment:
       Interesting. Is this a current bug with the old join semantics?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609989025



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       The trickiness as demonstrated in the current PR though, is that if we first do the expiration we may get records that are matched to the current processing record, which need to be skipped from deleting/emitting before the join.
   
   I think it is still possible to simply the current logic without naive buffering. Because:
   
   1) The current processing record's timestamp T is no larger than the updated max stream time T';
   2) The current processing record's matching record's smallest timestamp would be (T - window-size);
   3) The expired records' largest timestamp would be (T' - window-size - grace-period), where grace-period >= 0.
   
   In other words, if the current processing's record timestamp T is smaller than T' (i.e. it's a late record and hence did not advance the stream time), then for all records that are within [T - window-size, T' - window-size - grace-period] assuming T - window-size < T' - window-size - grace-period, would have already been expired end emitted, and hence won't be found and matched; if the current processing's record timestamp T == T' (i.e. it is not a late record), then T - window-size is always >= T' - window-size - grace-period, which means that all joined record's timestamps should be later than the expired timestamps. 
   
   That means, if we do the expiration first based on (T' - window-size - grace-period), the newly expired records' timestamps should all be smaller than any joined record's timestamps for that processing record generated later. And hence it is safe to just blindly expire them all without the `except` logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616269613



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
             }
 
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            maxObservedStreamTime.advance(inputRecordTimestamp);
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
+                    final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering
+                    emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp);
+
                     context().forward(
                         key,
                         joiner.apply(key, value, otherRecord.value),
-                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key)));
+                        To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
+                }
+
+                // Emit all expired records before adding a new non-joined record to the store. Otherwise,
+                // the put() call will advance the stream time, which causes records out of the retention
+                // period to be deleted, thus not being emitted later.
+                if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) {
+                    emitExpiredNonJoinedOuterRecords();
                 }
 
                 if (needOuterJoin) {
-                    context().forward(key, joiner.apply(key, value, null));
+                    // The maxStreamTime contains the max time observed in both sides of the join.
+                    // Having access to the time observed in the other join side fixes the following
+                    // problem:
+                    //
+                    // Say we have a window size of 5 seconds
+                    //  1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
+                    //     The record is not processed yet, and is added to the outer-join store
+                    //  3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11)
+                    //     It is time to look at the expired records. T10 and T2 should be emitted, but
+                    //     because T2 was late, then it is not fetched by the window store, so it is not processed
+                    //
+                    // See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests
+                    //
+                    // the condition below allows us to process the late record without the need
+                    // to hold it in the temporary outer store
+                    if (!outerJoinWindowStore.isPresent() || timeTo < maxObservedStreamTime.get()) {

Review comment:
       Well, while we should have a check like this, it seems it should go to the top of this method, next to the key/value `null` check? We should also add a corresponding `lateRecordDropSensor` (cf `KStreamWindowAggregate.java`).
   
   We can also `return` from `process()` early, as if we have a late record, we know that stream-time does not advance and thus we don't need to emit anything downstream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r608896190



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -133,34 +141,37 @@ public void process(final K key, final V1 value) {
                 return;
             }
 
-            // maxObservedStreamTime is updated and shared between left and right sides, so we can
-            // process a non-join record immediately if it is late
-            final long maxStreamTime = maxObservedStreamTime.updateAndGet(time -> Math.max(time, context().timestamp()));
-
             boolean needOuterJoin = outer;
+            boolean joinFound = false;
 
             final long inputRecordTimestamp = context().timestamp();
             final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
             final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
 
+            // maxObservedStreamTime is updated and shared between left and right join sides
+            maxObservedStreamTime.updateAndGet(t -> Math.max(t, inputRecordTimestamp));
+
             try (final WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
                 while (iter.hasNext()) {
                     needOuterJoin = false;
+                    joinFound = true;
                     final KeyValue<Long, V2> otherRecord = iter.next();
                     final long otherRecordTimestamp = otherRecord.key;
+
+                    // Emit expired records before the joined record to keep time ordering

Review comment:
       I think we can refactor the logic here as the following:
   
   0) suppose the received record timestamp is T1, the current stream time is T2 >= T1; and we found one or more matching record from the other side, with timestamp T1' <= T2' <= T3' etc. The joined record would have the timestamp of T1` = max(T1, T1'), T2` = max(T1, T2'), where T1` <= T2` <= ...
   
   1) After we get all the joined records, we do not call `context.forward()` yet, but just cache them locally.
   
   2) We then range query the expired records store, and generate the joined records (and also delete the records), again we do not call `context.forward()` yet, but just cache them locally. 
   
   3) We merge sort on these two sorted-by-timestamp list, and then call `context.forward()` on the sorted join result records to emit.
   
   In this we do not need the following complex logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538992



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -486,6 +486,327 @@ public void testJoin() {
         }
     }
 
+    @Test
+    public void testImprovedLeftJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will not emit records
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will not emit records
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testImprovedFullOuterJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records from the right topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined records from the right topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testOuterJoinEmitsNonJoinedRecordsAfterWindowCloses() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
+
+            inputTopic1.pipeInput(0, "A0", 1);
+            inputTopic1.pipeInput(1, "A1", 2);
+
+            // the window hasn't ended, so no record would be processed yet
+            processor.checkAndClearProcessResult();
+
+            inputTopic2.pipeInput(1, "a1", 3);
+
+            // after a join is found, only the joined records are processed
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", 3));
+
+            // time to process the non-joined records
+            // a dummy record is sent to advanced the stream time and cause to emit them
+            inputTopic1.pipeInput(2, "A2", 400);
+
+            // all non-joined of the previous window are emitted
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 1));
+        }
+    }
+
     @Test
     public void testOuterJoin() {

Review comment:
       We should move this test into `KStreamKStreamOuterJoinTest` class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610891116



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +132,40 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, ValueOrOtherValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = "-shared-outer-join-store";
+            final String outerJoinStoreGeneratedName = builder.newProcessorName(KStreamImpl.OUTERSHARED_NAME);
+            final String outerJoinStoreName = userProvidedBaseStoreName == null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + outerJoinSuffix;
+
+            outerJoinWindowStore = Optional.of(outerJoinWindowStoreBuilder(outerJoinStoreName, windows, streamJoinedInternal));
+        }
+
+        // Time shared between joins to keep track of the maximum stream time

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614114447



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -60,20 +82,41 @@
     }
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
+        private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;
+
+        private final Predicate<Windowed<KeyAndJoinSide<K>>> recordWindowHasClosed =
+            windowedKey -> windowedKey.window().start() + joinAfterMs + joinGraceMs < maxObservedStreamTime.get();
 
         private WindowStore<K, V2> otherWindow;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
-            otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
+            otherWindow = context.getStateStore(otherWindowName);
+
+            if (!internalOuterJoinFixDisabled(context.appConfigs())) {
+                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            }
         }
 
+        private boolean internalOuterJoinFixDisabled(final Map<String, Object> configs) {
+            final Object value = configs.get(StreamsConfig.InternalConfig.INTERNAL_DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX);
+            if (value == null) {
+                return DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT;

Review comment:
       That constant name is confusing. I just removed the *_DEFAULT constant and return false when the config is not set. But the idea is that the join fix is enabled (disabled = false) if the config is not set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r616264850



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -118,20 +142,47 @@
         final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams);
         builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+        Optional<StoreBuilder<WindowStore<KeyAndJoinSide<K1>, LeftOrRightValue<V1, V2>>>> outerJoinWindowStore = Optional.empty();
+        if (leftOuter || rightOuter) {
+            final String outerJoinSuffix = leftOuter ? "-shared-left-outer-join" : "-shared-outer-join";

Review comment:
       That was the intent of my comment, but if you look into the newly added tests in `TopologyTest.java` it might not matter too much, as we also have some "weird" naming in existing code -- and to stay backward compatible, we cannot really change the naming:
   
   ```
   inner-join: (store names)
    - KSTREAM-JOINTHIS-0000000004-store
    - KSTREAM-JOINOTHER-0000000005-store
   
   left-join: (store names)
    - KSTREAM-JOINTHIS-0000000004-store
    - KSTREAM-OUTEROTHER-0000000005-store
   
   (Ideally we should have named both KSTREAM-LEFTTHIS-0000000004-store and KSTREAM-LEFTOTHER-0000000005-store...)
   
   outer-join: (store names)
    - KSTREAM-OUTERTHIS-0000000004-store
    - KSTREAM-OUTEROTHER-0000000005-store
    ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org