You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/10 09:49:53 UTC

[GitHub] [kafka] mjsax opened a new pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

mjsax opened a new pull request #10861:
URL: https://github.com/apache/kafka/pull/10861


   We changed the behavior of left/outer stream-stream join via KAFKA-10847.
   To avoid a breaking change during an upgrade, we need to disable this
   fix by default.
   
   We only enable the fix if users opt-in expliclity by changing their
   code. We leverage KIP-633 (KAFKA-8613) that offers a new JoinWindows
   API with mandatory grace-period to enable the fix.
   
   Call for review @guozhangwang @spena @ableegoldman @izzyacademy


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652087049



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;

Review comment:
       Thanks for adding this note. This summary is very helpful to me in my implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651983648



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       I'm thinking exactly the opposite :) if we have a bug which would cause us to create a state store, checking it twice may actually mask the bug: we would end up creating the state store, and then on the second check not getting it, so the behavior is still correct, and it'll be hard for us to discover we are creating state stores unnecessarily.
   
   If we have a bug and do not create state stores when needed, then we would behave in the old way without the fix; the key point here is that, we only have one decision point to make, and either that decision is correct or buggy, we can get it surfaced quickly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861081018


   Replied to comments and rebased to resolve merge conflicts. \cc @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652222557



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, enableSpuriousResultFix);

Review comment:
       Sure. Don't see the connection to this change thought? Note, that `before()` and `after()` are non-static methods, and thus, they should not change/set the grace period. Only the static `of(size)` and non-static `grace()` should change grace period.
   ```
   JoinWindow.of(5000).before(30); // of() will set default of 24h, so no need for before() to reset to 24h, it can just inherit it
   JoinWindows.of(5000).grace(40).before(30); // this should leave grace at `40` however, without this fix `before()` would re-set grace to the default of 24h what is incorrect
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r649031614



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -67,7 +67,7 @@
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
  * @see TimestampExtractor
  */
-public final class JoinWindows extends Windows<Window> {
+public class JoinWindows extends Windows<Window> {

Review comment:
       Need to change this to be able to add an `JoinWindowsInternal` to access the newly added flag.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -57,18 +58,22 @@
 
     KStreamKStreamJoin(final boolean isLeftSide,
                        final String otherWindowName,
-                       final long joinBeforeMs,
-                       final long joinAfterMs,
-                       final long joinGraceMs,
+                       final JoinWindowsInternal windows,
                        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner,
                        final boolean outer,
                        final Optional<String> outerJoinWindowName,
                        final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime) {
         this.isLeftSide = isLeftSide;
         this.otherWindowName = otherWindowName;
-        this.joinBeforeMs = joinBeforeMs;
-        this.joinAfterMs = joinAfterMs;
-        this.joinGraceMs = joinGraceMs;
+        if (isLeftSide) {

Review comment:
       This was done by the caller before, ie, `KStreamImplJoin` from above. As we only pass one parameter now, we need to do the flip here if necessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -88,7 +90,22 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
     }
 
     @Test
-    public void testLeftJoinWithSpuriousResultFixDisabled() {
+    public void testLeftJoinWithSpuriousResultFixDisabledViaFeatureFlag() {
+        runLeftJoinWithoutSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {

Review comment:
       I duplicate this test, to verify that the feature flag, as well as the old API disables this fix. Thus, the usage of the old API in this method should not be changes via KIP-633 PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -150,12 +150,11 @@ public long get() {
         // Time shared between joins to keep track of the maximum stream time
         final MaxObservedStreamTime maxObservedStreamTime = new MaxObservedStreamTime();
 
+        final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
             true,
             otherWindowStore.name(),
-            windows.beforeMs,
-            windows.afterMs,
-            windows.gracePeriodMs(),
+            internalWindows,

Review comment:
       Easier to pass one parameter instead of 4

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;

Review comment:
       This is the new flag. We set it to `false` if the old methods are used, and to `true` for the new methods from KIP-633.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, enableSpuriousResultFix);

Review comment:
       Side fix: `before()` resets grace to 24h (not sure why -- seems to be a bug)
   
   same for `after()` below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;

Review comment:
       Side cleanup

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -36,15 +36,13 @@
  * Too much information to generalize, so Stream-Stream joins are represented by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
-    private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
       unused.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
##########
@@ -84,53 +84,56 @@ public void setup() {
     @Test
     public void shouldIncludeKeyInStreamSteamJoinResults() {
         leftStream.join(
-                rightStream,

Review comment:
       Just some cleanup. The actual change is above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -102,7 +102,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final
         topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
         topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
 
-        if (props == null || StreamsConfig.InternalConfig.getBoolean(new HashMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
+        if (enableSpuriousResultFix &&

Review comment:
       We evaluate the feature flag twice. This is the second time. So we add the new flag, too.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -63,8 +64,6 @@
 import static org.junit.Assert.assertTrue;
 
 public class KStreamKStreamJoinTest {
-    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];

Review comment:
       unnecessary

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       We evaluate the feature flag twice. This is the first time. So we add the new flag, too.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;
+
+    protected JoinWindows(final JoinWindows joinWindows) {
+        beforeMs = joinWindows.beforeMs;
+        afterMs = joinWindows.afterMs;
+        graceMs = joinWindows.graceMs;
+        enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+    }
+
     private JoinWindows(final long beforeMs,
                         final long afterMs,
-                        final long graceMs) {
+                        final long graceMs,
+                        final boolean enableSpuriousResultFix) {
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
+        this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
-    /**
+    public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {

Review comment:
       Kept the overlap with KIP-633 PR to a minimum. The key is, that this and the method below must set the new flag to `true`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -138,9 +143,12 @@ public void testInnerRepartitioned() {
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
-                .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
-                                 .selectKey(MockMapper.selectKeyKeyValueMapper()),
-                       valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
+            .join(
+                rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                    .selectKey(MockMapper.selectKeyKeyValueMapper()),
+                valueJoiner,
+                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))

Review comment:
       Need to switch to the new API to keep the fix enabled. Otherwise the test would break.
   
   Just set grace to 24h that was the old default to ensure nothing breaks. -- Similar in other tests below.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -396,7 +416,7 @@ public void testJoin() {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(EMPTY);
+            processor.checkAndClearProcessResult();

Review comment:
       We can pass nothing to do a check for "no result"

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -203,68 +202,79 @@ public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
     @Test
     public void shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
         // Case where retention of thisJoinStore doesn't match JoinWindows
-        final WindowBytesStoreSupplier thisStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 500, 100, true);

Review comment:
       Did some side cleanup... Will highlight actual changes.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -852,25 +942,26 @@ private void testUpperWindowBound(final int[] expectedKeys,
         // push a dummy record to produce all left-join non-joined items
         time += 301L;
         inputTopic1.pipeInput(0, "dummy", time);
-
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+null", 1101),
-            new KeyValueTimestamp<>(0, "D0+null", 1102),
-            new KeyValueTimestamp<>(1, "D1+null", 1102),
-            new KeyValueTimestamp<>(0, "E0+null", 1103),
-            new KeyValueTimestamp<>(1, "E1+null", 1103),
-            new KeyValueTimestamp<>(2, "E2+null", 1103),
-            new KeyValueTimestamp<>(0, "F0+null", 1104),
-            new KeyValueTimestamp<>(1, "F1+null", 1104),
-            new KeyValueTimestamp<>(2, "F2+null", 1104),
-            new KeyValueTimestamp<>(3, "F3+null", 1104));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "C0+null", 1101L),
+            new KeyValueTimestamp<>(0, "D0+null", 1102L),
+            new KeyValueTimestamp<>(1, "D1+null", 1102L),
+            new KeyValueTimestamp<>(0, "E0+null", 1103L),
+            new KeyValueTimestamp<>(1, "E1+null", 1103L),
+            new KeyValueTimestamp<>(2, "E2+null", 1103L),
+            new KeyValueTimestamp<>(0, "F0+null", 1104L),
+            new KeyValueTimestamp<>(1, "F1+null", 1104L),
+            new KeyValueTimestamp<>(2, "F2+null", 1104L),
+            new KeyValueTimestamp<>(3, "F3+null", 1104L)
+        );
     }
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> processor) {
         long time;
         final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
-        final TestInputTopic<Integer, String> inputTopic2 = driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer());

Review comment:
       unused

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -483,8 +513,9 @@ public void testOuterJoin() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),

Review comment:
       Switch to new API. (Not all tests in this class need the switch for this fix.Leave it for KIP-633 PR to update the others when the old API get's deprecated.)

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -74,8 +75,9 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Switch to new API (similar below for all other tests in this class)

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -72,8 +72,9 @@ public void testOuterJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Same as left-join-test above.
   
   Switch to new API for all cases, and duplicate the "disable fix" test using feature flag and old API.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -138,13 +158,30 @@ public void testLeftJoinWithSpuriousResultFixDisabled() {
             for (int i = 0; i < 2; i++) {
                 inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
-                new KeyValueTimestamp<>(1, "A1+a1", 0));
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+a0", 0L),
+                new KeyValueTimestamp<>(1, "A1+a1", 0L)
+            );
         }
     }
 
     @Test
-    public void testLeftJoinDuplicatesWithFixDisabled() {
+    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledFeatureFlag() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),

Review comment:
       As above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652355524



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       After update the code with the `else` a few tests started to fail. The issue is, that for left/outer join we _always_ set the store name (even if the feature is disabled...) -- only for inner join, we get an `Optinal.empty()`. Thus, we can actually not verify the `else` case (ie, we added a store even if we don't need it) at runtime. I guess we need to rely on the added unit tests instead to cover this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651362625



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;
+
+    protected JoinWindows(final JoinWindows joinWindows) {

Review comment:
       Not really possible. Multiple issues:
    - we still need a protected constructor in `JoinWindows`
    - we need to make `graceMs` protected
    - blocker: we need to make the _public_ members `beforeMs` and `afterMs` non-final (and we cannot do this...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651363243



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       I don't think that checking the condition twice is a real issue? Also, it seem to be better to do the check, because otherwise (if we might have a bug and incorrectly get a `null` store back) we might mask the bug?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861778238


   @mjsax the changes look good to me. I will rebase my branch after your changes get merged into trunk. Thanks for looping me in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652222557



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, enableSpuriousResultFix);

Review comment:
       Sure. Don't see the connection to this change thought? Note, that `before()` and `after()` are non-static method, and thus, they should not change/set the grace period. Only the static `of(size)` and non-static `grace()` should change grace period.
   ```
   JoinWindow.of(5000).before(30); // of() will set default of 24h, so no need for before() to reset to 24h, it can just inherit it
   JoinWindows.of(5000).grace(40).before(30); // this should leave grace at `40` however, without this fix `before()` would re-set grace to the default of 24h what is incorrect
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r650448957



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;
+
+    protected JoinWindows(final JoinWindows joinWindows) {

Review comment:
       How about moving this copy-constructor to JoinWindowsInternal?

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       Thinking a bit more to not evaluate the condition twice: since we call `addStateStore` in the logical plan, a.k.a. `StreamStreamJoinNode` first if both conditions are met, could we just blindly try to get the store from `context.getStateStore()` here, which would return `null` if the store was not added during logical plan and hence does not exist anyways. If it is `null` we can still convert it to an `Optional.empty()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652088281



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -150,12 +150,11 @@ public long get() {
         // Time shared between joins to keep track of the maximum stream time
         final MaxObservedStreamTime maxObservedStreamTime = new MaxObservedStreamTime();
 
+        final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
             true,
             otherWindowStore.name(),
-            windows.beforeMs,
-            windows.afterMs,
-            windows.gracePeriodMs(),
+            internalWindows,

Review comment:
       I like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652087933



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, enableSpuriousResultFix);

Review comment:
       As per the feedback from @ableegoldman the grace period in the old methods need to stay at 24h. It is in the new methods that we get to specify it as zero.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861886242


   Re-triggered unit tests. We can merge after it turns green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org