You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/04/24 20:03:29 UTC
[kafka] branch 3.4 updated: KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 754365a032a KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
754365a032a is described below
commit 754365a032adf96e42603d0c18e433d2efa73549
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Mon Apr 24 12:40:25 2023 -0700
KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592)
Stream-stream outer join, uses a "shared time tracker" to track stream-time progress for left and right input in a single place. This time tracker is incorrectly shared across tasks.
This PR introduces a supplier to create a "shared time tracker" object per task, to be shared between the left and right join processors.
Reviewers: Victoria Xia <vi...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
.../streams/kstream/internals/KStreamImplJoin.java | 21 ++-
.../kstream/internals/KStreamKStreamJoin.java | 15 +-
.../kstream/internals/KStreamKStreamSelfJoin.java | 9 +-
.../integration/KStreamKStreamIntegrationTest.java | 185 +++++++++++++++++++++
4 files changed, 216 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index b2405db7317..f4ad9ac682f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -43,6 +44,7 @@ import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@@ -57,6 +59,18 @@ class KStreamImplJoin {
private final boolean leftOuter;
private final boolean rightOuter;
+ static class TimeTrackerSupplier {
+ private final Map<TaskId, TimeTracker> tracker = new HashMap<>();
+
+ public TimeTracker get(final TaskId taskId) {
+ return tracker.computeIfAbsent(taskId, taskId1 -> new TimeTracker());
+ }
+
+ public void remove(final TaskId taskId) {
+ tracker.remove(taskId);
+ }
+ }
+
static class TimeTracker {
private long emitIntervalMs = 1000L;
long streamTime = ConsumerRecord.NO_TIMESTAMP;
@@ -159,7 +173,7 @@ class KStreamImplJoin {
}
// Time-shared between joins to keep track of the maximum stream time
- final TimeTracker sharedTimeTracker = new TimeTracker();
+ final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();
final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>(
@@ -169,7 +183,7 @@ class KStreamImplJoin {
joiner,
leftOuter,
outerJoinWindowStore.map(StoreBuilder::name),
- sharedTimeTracker
+ sharedTimeTrackerSupplier
);
final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>(
@@ -179,14 +193,13 @@ class KStreamImplJoin {
AbstractStream.reverseJoinerWithKey(joiner),
rightOuter,
outerJoinWindowStore.map(StoreBuilder::name),
- sharedTimeTracker
+ sharedTimeTrackerSupplier
);
final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>(
thisWindowStore.name(),
internalWindows,
joiner,
- sharedTimeTracker,
windows.size() + windows.gracePeriodMs()
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 397a86d24ad..067dd50f0cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -57,7 +58,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final Optional<String> outerJoinWindowName;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
- private final TimeTracker sharedTimeTracker;
+ private final TimeTrackerSupplier sharedTimeTrackerSupplier;
KStreamKStreamJoin(final boolean isLeftSide,
final String otherWindowName,
@@ -65,7 +66,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner,
final boolean outer,
final Optional<String> outerJoinWindowName,
- final TimeTracker sharedTimeTracker) {
+ final TimeTrackerSupplier sharedTimeTrackerSupplier) {
this.isLeftSide = isLeftSide;
this.otherWindowName = otherWindowName;
if (isLeftSide) {
@@ -82,7 +83,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
this.joiner = joiner;
this.outer = outer;
this.outerJoinWindowName = outerJoinWindowName;
- this.sharedTimeTracker = sharedTimeTracker;
+ this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
}
@Override
@@ -95,6 +96,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private Sensor droppedRecordsSensor;
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty();
private InternalProcessorContext<K, VOut> internalProcessorContext;
+ private TimeTracker sharedTimeTracker;
@Override
public void init(final ProcessorContext<K, VOut> context) {
@@ -104,6 +106,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
otherWindowStore = context.getStateStore(otherWindowName);
+ sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId());
if (enableSpuriousResultFix) {
outerJoinStore = outerJoinWindowName.map(context::getStateStore);
@@ -124,7 +127,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) {
return;
}
-
boolean needOuterJoin = outer;
final long inputRecordTimestamp = record.timestamp();
@@ -262,5 +264,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
}
}
}
+
+ @Override
+ public void close() {
+ sharedTimeTrackerSupplier.remove(context().taskId());
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
index a6af2a4e082..64d8b45c039 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java
@@ -44,13 +44,10 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
private final long retentionPeriod;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis;
- private final TimeTracker sharedTimeTracker;
-
KStreamKStreamSelfJoin(
final String windowName,
final JoinWindowsInternal windows,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis,
- final TimeTracker sharedTimeTracker,
final long retentionPeriod) {
this.windowName = windowName;
@@ -59,7 +56,6 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
this.joinOtherBeforeMs = windows.afterMs;
this.joinOtherAfterMs = windows.beforeMs;
this.joinerThis = joinerThis;
- this.sharedTimeTracker = sharedTimeTracker;
this.retentionPeriod = retentionPeriod;
}
@@ -69,6 +65,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
}
private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
+ private final TimeTracker timeTracker = new TimeTracker();
private WindowStore<K, V2> windowStore;
private Sensor droppedRecordsSensor;
@@ -95,9 +92,9 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1
final Record selfRecord = record
.withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value()))
.withTimestamp(inputRecordTimestamp);
- sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+ timeTracker.advanceStreamTime(inputRecordTimestamp);
// We emit the self record only if it isn't expired.
- final boolean emitSelfRecord = inputRecordTimestamp > sharedTimeTracker.streamTime - retentionPeriod + 1;
+ final boolean emitSelfRecord = inputRecordTimestamp > timeTracker.streamTime - retentionPeriod + 1;
// Join current record with other
try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
new file mode 100644
index 00000000000..7e60441a0f2
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import static java.time.Duration.ofSeconds;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+@Timeout(600)
+@Tag("integration")
+public class KStreamKStreamIntegrationTest {
+ private final static int NUM_BROKERS = 1;
+
+ public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private final static MockTime MOCK_TIME = CLUSTER.time;
+ private final static String LEFT_STREAM = "leftStream";
+ private final static String RIGHT_STREAM = "rightStream";
+ private final static String OUTPUT = "output";
+ private Properties streamsConfig;
+ private KafkaStreams streams;
+ private final static Properties CONSUMER_CONFIG = new Properties();
+ private final static Properties PRODUCER_CONFIG = new Properties();
+
+ @BeforeAll
+ public static void startCluster() throws Exception {
+ CLUSTER.start();
+
+ //Use multiple partitions to ensure distribution of keys.
+ CLUSTER.createTopic(LEFT_STREAM, 4, 1);
+ CLUSTER.createTopic(RIGHT_STREAM, 4, 1);
+ CLUSTER.createTopic(OUTPUT, 4, 1);
+
+ CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer");
+ CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws IOException {
+ final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+ streamsConfig = getStreamsConfig(safeTestName);
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ if (streams != null) {
+ streams.close();
+ streams = null;
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ }
+
+ @Test
+ public void shouldOuterJoin() throws Exception {
+ final Set<KeyValue<String, String>> expected = new HashSet<>();
+ expected.add(new KeyValue<>("Key-1", "value1=left-1a,value2=null"));
+ expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null"));
+ expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null"));
+ expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null"));
+
+ verifyKStreamKStreamOuterJoin(expected);
+ }
+
+ private void verifyKStreamKStreamOuterJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
+ streams = prepareTopology(streamsConfig);
+
+ startApplicationAndWaitUntilRunning(Collections.singletonList(streams), ofSeconds(120));
+
+ PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+ PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ final List<KeyValue<String, String>> left1 = asList(
+ new KeyValue<>("Key-1", "left-1a"),
+ new KeyValue<>("Key-2", "left-2a"),
+ new KeyValue<>("Key-3", "left-3a"),
+ new KeyValue<>("Key-4", "left-4a")
+ );
+
+ final List<KeyValue<String, String>> left2 = asList(
+ new KeyValue<>("Key-1", "left-1b"),
+ new KeyValue<>("Key-2", "left-2b"),
+ new KeyValue<>("Key-3", "left-3b"),
+ new KeyValue<>("Key-4", "left-4b")
+ );
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left1, PRODUCER_CONFIG, MOCK_TIME);
+ MOCK_TIME.sleep(10000);
+ IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left2, PRODUCER_CONFIG, MOCK_TIME);
+
+ final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived(
+ CONSUMER_CONFIG,
+ OUTPUT,
+ expectedResult.size()));
+
+ assertThat(expectedResult, equalTo(result));
+ }
+
+ private Properties getStreamsConfig(final String testName) {
+ final Properties streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStream-KStream-join" + testName);
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+ return streamsConfig;
+ }
+
+ private static KafkaStreams prepareTopology(final Properties streamsConfig) {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<String, String> stream1 = builder.stream(LEFT_STREAM);
+ final KStream<String, String> stream2 = builder.stream(RIGHT_STREAM);
+
+ final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
+
+ stream1.outerJoin(stream2, joiner, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10))).to(OUTPUT);
+
+ return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
+ }
+
+}
\ No newline at end of file