You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/10 10:09:02 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r649031614



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -67,7 +67,7 @@
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
  * @see TimestampExtractor
  */
-public final class JoinWindows extends Windows<Window> {
+public class JoinWindows extends Windows<Window> {

Review comment:
       Need to change this to be able to add an `JoinWindowsInternal` to access the newly added flag.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -57,18 +58,22 @@
 
     KStreamKStreamJoin(final boolean isLeftSide,
                        final String otherWindowName,
-                       final long joinBeforeMs,
-                       final long joinAfterMs,
-                       final long joinGraceMs,
+                       final JoinWindowsInternal windows,
                        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends R> joiner,
                        final boolean outer,
                        final Optional<String> outerJoinWindowName,
                        final KStreamImplJoin.MaxObservedStreamTime maxObservedStreamTime) {
         this.isLeftSide = isLeftSide;
         this.otherWindowName = otherWindowName;
-        this.joinBeforeMs = joinBeforeMs;
-        this.joinAfterMs = joinAfterMs;
-        this.joinGraceMs = joinGraceMs;
+        if (isLeftSide) {

Review comment:
       This was done by the caller before, ie, `KStreamImplJoin` from above. As we only pass one parameter now, we need to do the flip here if necessary.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -88,7 +90,22 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
     }
 
     @Test
-    public void testLeftJoinWithSpuriousResultFixDisabled() {
+    public void testLeftJoinWithSpuriousResultFixDisabledViaFeatureFlag() {
+        runLeftJoinWithoutSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {

Review comment:
       I duplicate this test, to verify that the feature flag, as well as the old API disables this fix. Thus, the usage of the old API in this method should not be changes via KIP-633 PR.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -150,12 +150,11 @@ public long get() {
         // Time shared between joins to keep track of the maximum stream time
         final MaxObservedStreamTime maxObservedStreamTime = new MaxObservedStreamTime();
 
+        final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(
             true,
             otherWindowStore.name(),
-            windows.beforeMs,
-            windows.afterMs,
-            windows.gracePeriodMs(),
+            internalWindows,

Review comment:
       Easier to pass one parameter instead of 4

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;

Review comment:
       This is the new flag. We set it to `false` if the old methods are used, and to `true` for the new methods from KIP-633.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) throws IllegalArgume
     public JoinWindows before(final Duration timeDifference) throws IllegalArgumentException {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
         final long timeDifferenceMs = validateMillisecondDuration(timeDifference, msgPrefix);
-        return new JoinWindows(timeDifferenceMs, afterMs, DEFAULT_GRACE_PERIOD_MS);
+        return new JoinWindows(timeDifferenceMs, afterMs, graceMs, enableSpuriousResultFix);

Review comment:
       Side fix: `before()` resets grace to 24h (not sure why -- seems to be a bug)
   
   same for `after()` below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;

Review comment:
       Side cleanup

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -36,15 +36,13 @@
  * Too much information to generalize, so Stream-Stream joins are represented by a specific node.
  */
 public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, V1, V2, VR> {
-    private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
       unused.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
##########
@@ -84,53 +84,56 @@ public void setup() {
     @Test
     public void shouldIncludeKeyInStreamSteamJoinResults() {
         leftStream.join(
-                rightStream,

Review comment:
       Just some cleanup. The actual change is above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
##########
@@ -102,7 +102,14 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final
         topologyBuilder.addStateStore(thisWindowStoreBuilder, thisWindowedStreamProcessorName, otherProcessorName);
         topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
 
-        if (props == null || StreamsConfig.InternalConfig.getBoolean(new HashMap(props), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
+        if (enableSpuriousResultFix &&

Review comment:
       We evaluate the feature flag twice. This is the second time. So we add the new flag, too.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -63,8 +64,6 @@
 import static org.junit.Assert.assertTrue;
 
 public class KStreamKStreamJoinTest {
-    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];

Review comment:
       unnecessary

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -82,20 +87,23 @@
 
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
         private WindowStore<K, V2> otherWindowStore;
-        private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
         private Optional<WindowStore<KeyAndJoinSide<K>, LeftOrRightValue>> outerJoinWindowStore = Optional.empty();
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
-            metrics = (StreamsMetricsImpl) context.metrics();
+            final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
 
-            if (StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-                outerJoinWindowStore = outerJoinWindowName.map(name -> context.getStateStore(name));
+            if (enableSpuriousResultFix

Review comment:
       We evaluate the feature flag twice. This is the first time. So we add the new flag, too.

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -76,18 +76,37 @@
 
     private final long graceMs;
 
+    protected final boolean enableSpuriousResultFix;
+
+    protected JoinWindows(final JoinWindows joinWindows) {
+        beforeMs = joinWindows.beforeMs;
+        afterMs = joinWindows.afterMs;
+        graceMs = joinWindows.graceMs;
+        enableSpuriousResultFix = joinWindows.enableSpuriousResultFix;
+    }
+
     private JoinWindows(final long beforeMs,
                         final long afterMs,
-                        final long graceMs) {
+                        final long graceMs,
+                        final boolean enableSpuriousResultFix) {
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
         }
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
+        this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
-    /**
+    public static JoinWindows ofTimeDifferenceAndGrace(final Duration timeDifference, final Duration afterWindowEnd) {

Review comment:
       Kept the overlap with KIP-633 PR to a minimum. The key is, that this and the method below must set the new flag to `true`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
##########
@@ -138,9 +143,12 @@ public void testInnerRepartitioned() {
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
-                .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
-                                 .selectKey(MockMapper.selectKeyKeyValueMapper()),
-                       valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
+            .join(
+                rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                    .selectKey(MockMapper.selectKeyKeyValueMapper()),
+                valueJoiner,
+                JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))

Review comment:
       Need to switch to the new API to keep the fix enabled. Otherwise the test would break.
   
   Just set grace to 24h that was the old default to ensure nothing breaks. -- Similar in other tests below.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -396,7 +416,7 @@ public void testJoin() {
             for (int i = 0; i < 2; i++) {
                 inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(EMPTY);
+            processor.checkAndClearProcessResult();

Review comment:
       We can pass nothing to do a check for "no result"

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -203,68 +202,79 @@ public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
     @Test
     public void shouldThrowExceptionThisStoreSupplierRetentionDoNotMatchWindowsSizeAndGrace() {
         // Case where retention of thisJoinStore doesn't match JoinWindows
-        final WindowBytesStoreSupplier thisStoreSupplier = buildWindowBytesStoreSupplier("in-memory-join-store", 500, 100, true);

Review comment:
       Did some side cleanup... Will highlight actual changes.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -852,25 +942,26 @@ private void testUpperWindowBound(final int[] expectedKeys,
         // push a dummy record to produce all left-join non-joined items
         time += 301L;
         inputTopic1.pipeInput(0, "dummy", time);
-
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+null", 1101),
-            new KeyValueTimestamp<>(0, "D0+null", 1102),
-            new KeyValueTimestamp<>(1, "D1+null", 1102),
-            new KeyValueTimestamp<>(0, "E0+null", 1103),
-            new KeyValueTimestamp<>(1, "E1+null", 1103),
-            new KeyValueTimestamp<>(2, "E2+null", 1103),
-            new KeyValueTimestamp<>(0, "F0+null", 1104),
-            new KeyValueTimestamp<>(1, "F1+null", 1104),
-            new KeyValueTimestamp<>(2, "F2+null", 1104),
-            new KeyValueTimestamp<>(3, "F3+null", 1104));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "C0+null", 1101L),
+            new KeyValueTimestamp<>(0, "D0+null", 1102L),
+            new KeyValueTimestamp<>(1, "D1+null", 1102L),
+            new KeyValueTimestamp<>(0, "E0+null", 1103L),
+            new KeyValueTimestamp<>(1, "E1+null", 1103L),
+            new KeyValueTimestamp<>(2, "E2+null", 1103L),
+            new KeyValueTimestamp<>(0, "F0+null", 1104L),
+            new KeyValueTimestamp<>(1, "F1+null", 1104L),
+            new KeyValueTimestamp<>(2, "F2+null", 1104L),
+            new KeyValueTimestamp<>(3, "F3+null", 1104L)
+        );
     }
 
     private void testLowerWindowBound(final int[] expectedKeys,
                                       final TopologyTestDriver driver,
                                       final MockProcessor<Integer, String> processor) {
         long time;
         final TestInputTopic<Integer, String> inputTopic1 = driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer());
-        final TestInputTopic<Integer, String> inputTopic2 = driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer());

Review comment:
       unused

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -483,8 +513,9 @@ public void testOuterJoin() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofHours(24L)),

Review comment:
       Switch to new API. (Not all tests in this class need the switch for this fix.Leave it for KIP-633 PR to update the others when the old API get's deprecated.)

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -74,8 +75,9 @@ public void testLeftJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.leftJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Switch to new API (similar below for all other tests in this class)

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -72,8 +72,9 @@ public void testOuterJoinWithInvalidSpuriousResultFixFlag() {
         joined = stream1.outerJoin(
             stream2,
             MockValueJoiner.TOSTRING_JOINER,
-            JoinWindows.of(ofMillis(100)),
-            StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
+            JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),

Review comment:
       Same as left-join-test above.
   
   Switch to new API for all cases, and duplicate the "disable fix" test using feature flag and old API.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##########
@@ -138,13 +158,30 @@ public void testLeftJoinWithSpuriousResultFixDisabled() {
             for (int i = 0; i < 2; i++) {
                 inputTopic2.pipeInput(expectedKeys[i], "a" + expectedKeys[i]);
             }
-            processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0),
-                new KeyValueTimestamp<>(1, "A1+a1", 0));
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+a0", 0L),
+                new KeyValueTimestamp<>(1, "A1+a1", 0L)
+            );
         }
     }
 
     @Test
-    public void testLeftJoinDuplicatesWithFixDisabled() {
+    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledFeatureFlag() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(10L)),
+            false
+        );
+    }
+    @Test
+    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
+        testLeftJoinDuplicatesSpuriousResultFix(
+            JoinWindows.of(ofMillis(100L)).grace(ofMillis(10L)),

Review comment:
       As above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org