You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/24 20:03:29 UTC

[kafka] branch 3.4 updated: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 754365a032a KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
754365a032a is described below

commit 754365a032adf96e42603d0c18e433d2efa73549
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon Apr 24 12:40:25 2023 -0700

    KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
    
    Stream-stream outer join, uses a "shared time tracker" to track stream-time progress for left and right input in a single place. This time tracker is incorrectly shared across tasks.
    
    This PR introduces a supplier to create a "shared time tracker" object per task, to be shared between the left and right join processors.
    
    Reviewers: Victoria Xia <vi...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
 .../streams/kstream/internals/KStreamImplJoin.java |  21 ++-
 .../kstream/internals/KStreamKStreamJoin.java      |  15 +-
 .../kstream/internals/KStreamKStreamSelfJoin.java  |   9 +-
 .../integration/KStreamKStreamIntegrationTest.java | 185 +++++++++++++++++++++
 4 files changed, 216 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index b2405db7317..f4ad9ac682f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -57,6 +59,18 @@ class KStreamImplJoin {
     private final boolean leftOuter;
     private final boolean rightOuter;
 
+    static class TimeTrackerSupplier {
+        private final Map<TaskId, TimeTracker> tracker = new HashMap<>();
+
+        public TimeTracker get(final TaskId taskId) {
+            return tracker.computeIfAbsent(taskId, taskId1 -> new TimeTracker());
+        }
+
+        public void remove(final TaskId taskId) {
+            tracker.remove(taskId);
+        }
+    }
+
     static class TimeTracker {
         private long emitIntervalMs = 1000L;
         long streamTime = ConsumerRecord.NO_TIMESTAMP;
@@ -159,7 +173,7 @@ class KStreamImplJoin {
         }
 
         // Time-shared between joins to keep track of the maximum stream time
-        final TimeTracker sharedTimeTracker = new TimeTracker();
+        final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();
 
         final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
         final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>(
@@ -169,7 +183,7 @@ class KStreamImplJoin {
             joiner,
             leftOuter,
             outerJoinWindowStore.map(StoreBuilder::name),
-            sharedTimeTracker
+            sharedTimeTrackerSupplier
         );
 
         final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>(
@@ -179,14 +193,13 @@ class KStreamImplJoin {
             AbstractStream.reverseJoinerWithKey(joiner),
             rightOuter,
             outerJoinWindowStore.map(StoreBuilder::name),
-            sharedTimeTracker
+            sharedTimeTrackerSupplier
         );
 
         final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
             thisWindowStore.name(),
             internalWindows,
             joiner,
-            sharedTimeTracker,
             windows.size() + windows.gracePeriodMs()
         );
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 397a86d24ad..067dd50f0cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -57,7 +58,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
     private final Optional<String> outerJoinWindowName;
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
 
-    private final TimeTracker sharedTimeTracker;
+    private final TimeTrackerSupplier sharedTimeTrackerSupplier;
 
     KStreamKStreamJoin(final boolean isLeftSide,
                        final String otherWindowName,
@@ -65,7 +66,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
                        final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
                        final boolean outer,
                        final Optional<String> outerJoinWindowName,
-                       final TimeTracker sharedTimeTracker) {
+                       final TimeTrackerSupplier sharedTimeTrackerSupplier) {
         this.isLeftSide = isLeftSide;
         this.otherWindowName = otherWindowName;
         if (isLeftSide) {
@@ -82,7 +83,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
         this.joiner = joiner;
         this.outer = outer;
         this.outerJoinWindowName = outerJoinWindowName;
-        this.sharedTimeTracker = sharedTimeTracker;
+        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
     }
 
     @Override
@@ -95,6 +96,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
         private Sensor droppedRecordsSensor;
         private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
         private InternalProcessorContext<K, VOut> internalProcessorContext;
+        private TimeTracker sharedTimeTracker;
 
         @Override
         public void init(final ProcessorContext<K, VOut> context) {
@@ -104,6 +106,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             otherWindowStore = context.getStateStore(otherWindowName);
+            sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId());
 
             if (enableSpuriousResultFix) {
                 outerJoinStore = outerJoinWindowName.map(context::getStateStore);
@@ -124,7 +127,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
             if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
                 return;
             }
-
             boolean needOuterJoin = outer;
 
             final long inputRecordTimestamp = record.timestamp();
@@ -262,5 +264,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
                 }
             }
         }
+
+        @Override
+        public void close() {
+            sharedTimeTrackerSupplier.remove(context().taskId());
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
index a6af2a4e082..64d8b45c039 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
@@ -44,13 +44,10 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
     private final long retentionPeriod;
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
 
-    private final TimeTracker sharedTimeTracker;
-
     KStreamKStreamSelfJoin(
         final String windowName,
         final JoinWindowsInternal windows,
         final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
-        final TimeTracker sharedTimeTracker,
         final long retentionPeriod) {
 
         this.windowName = windowName;
@@ -59,7 +56,6 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
         this.joinOtherBeforeMs = windows.afterMs;
         this.joinOtherAfterMs = windows.beforeMs;
         this.joinerThis = joinerThis;
-        this.sharedTimeTracker = sharedTimeTracker;
         this.retentionPeriod = retentionPeriod;
     }
 
@@ -69,6 +65,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
     }
 
     private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+        private final TimeTracker timeTracker = new TimeTracker();
         private WindowStore<K, V2> windowStore;
         private Sensor droppedRecordsSensor;
 
@@ -95,9 +92,9 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
             final Record selfRecord = record
                 .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
                 .withTimestamp(inputRecordTimestamp);
-            sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+            timeTracker.advanceStreamTime(inputRecordTimestamp);
             // We emit the self record only if it isn't expired.
-            final boolean emitSelfRecord = inputRecordTimestamp > sharedTimeTracker.streamTime - retentionPeriod + 1;
+            final boolean emitSelfRecord = inputRecordTimestamp > timeTracker.streamTime - retentionPeriod + 1;
 
             // Join current record with other
             try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
new file mode 100644
index 00000000000..7e60441a0f2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+    private final static int NUM_BROKERS = 1;
+
+    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final static MockTime MOCK_TIME = CLUSTER.time;
+    private final static String LEFT_STREAM = "leftStream";
+    private final static String RIGHT_STREAM = "rightStream";
+    private final static String OUTPUT = "output";
+    private Properties streamsConfig;
+    private KafkaStreams streams;
+    private final static Properties CONSUMER_CONFIG = new Properties();
+    private final static Properties PRODUCER_CONFIG = new Properties();
+
+    @BeforeAll
+    public static void startCluster() throws Exception {
+        CLUSTER.start();
+
+        //Use multiple partitions to ensure distribution of keys.
+        CLUSTER.createTopic(LEFT_STREAM, 4, 1);
+        CLUSTER.createTopic(RIGHT_STREAM, 4, 1);
+        CLUSTER.createTopic(OUTPUT, 4, 1);
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @AfterAll
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws IOException {
+        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        streamsConfig = getStreamsConfig(safeTestName);
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        if (streams != null) {
+            streams.close();
+            streams = null;
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    }
+
+    @Test
+    public void shouldOuterJoin() throws Exception {
+        final Set<KeyValue<String, String>> expected = new HashSet<>();
+        expected.add(new KeyValue<>("Key-1", "value1=left-1a,value2=null"));
+        expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
+        expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
+        expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
+
+        verifyKStreamKStreamOuterJoin(expected);
+    }
+
+    private void verifyKStreamKStreamOuterJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
+        streams = prepareTopology(streamsConfig);
+
+        startApplicationAndWaitUntilRunning(Collections.singletonList(streams), ofSeconds(120));
+
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        final List<KeyValue<String, String>> left1 = asList(
+                new KeyValue<>("Key-1", "left-1a"),
+                new KeyValue<>("Key-2", "left-2a"),
+                new KeyValue<>("Key-3", "left-3a"),
+                new KeyValue<>("Key-4", "left-4a")
+        );
+
+        final List<KeyValue<String, String>> left2 = asList(
+                new KeyValue<>("Key-1", "left-1b"),
+                new KeyValue<>("Key-2", "left-2b"),
+                new KeyValue<>("Key-3", "left-3b"),
+                new KeyValue<>("Key-4", "left-4b")
+        );
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left1, PRODUCER_CONFIG, MOCK_TIME);
+        MOCK_TIME.sleep(10000);
+        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left2, PRODUCER_CONFIG, MOCK_TIME);
+
+        final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
+            CONSUMER_CONFIG,
+            OUTPUT,
+            expectedResult.size()));
+
+        assertThat(expectedResult, equalTo(result));
+    }
+
+    private Properties getStreamsConfig(final String testName) {
+        final Properties streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStream-KStream-join" + testName);
+        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        return streamsConfig;
+    }
+
+    private static KafkaStreams prepareTopology(final Properties streamsConfig) {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> stream1 = builder.stream(LEFT_STREAM);
+        final KStream<String, String> stream2 = builder.stream(RIGHT_STREAM);
+
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
+
+        stream1.outerJoin(stream2, joiner, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10))).to(OUTPUT);
+
+        return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
+    }
+
+}
\ No newline at end of file