You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/12/28 20:33:15 UTC
[kafka] branch trunk updated: KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c6c6bfa2bd KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
9c6c6bfa2bd is described below
commit 9c6c6bfa2bdf97d546ebb7721bc19d15c955b746
Author: Qing <l....@gmail.com>
AuthorDate: Wed Dec 28 20:32:54 2022 +0000
KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Hao Li <hl...@confluent.io>
---
.../kstream/internals/KStreamKStreamJoin.java | 7 +-
.../kstream/internals/KStreamKStreamJoinTest.java | 97 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 397a86d24ad..d1cfa25e191 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -205,9 +205,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
return;
}
- if (sharedTimeTracker.nextTimeToEmit == 0) {
- sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
- }
+
+ // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
+ // they can get out of sync during a clock drift
+ sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
sharedTimeTracker.advanceNextTimeToEmit();
// reset to MAX_VALUE in case the store is empty
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 6864396d433..a5146a93333 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
@@ -31,17 +32,33 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
import org.junit.Test;
+import org.mockito.Mockito;
import java.time.Duration;
import java.time.Instant;
@@ -49,8 +66,15 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static java.time.Duration.ofHours;
import static java.time.Duration.ofMillis;
@@ -333,6 +357,77 @@ public class KStreamKStreamJoinTest {
runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
}
+ @Test
+ public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
+ /**
+ * This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually.
+ */
+ final KStreamImplJoin.TimeTracker tracker = new KStreamImplJoin.TimeTracker();
+ final KStreamKStreamJoin<String, String, String, String> join = new KStreamKStreamJoin<>(
+ false,
+ "other",
+ new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
+ (key, v1, v2) -> v1 + v2,
+ true,
+ Optional.of("outer"),
+ tracker);
+ final Processor<String, String, String, String> joinProcessor = join.get();
+ final MockInternalNewProcessorContext<String, String> procCtx = new MockInternalNewProcessorContext<>();
+ final WindowStore<String, String> otherStore = new WindowStoreBuilder<>(
+ new InMemoryWindowBytesStoreSupplier(
+ "other",
+ 1000L,
+ 100,
+ false),
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()).build();
+
+ final KeyValueStore<TimestampedKeyAndJoinSide<String>, LeftOrRightValue<String, String>> outerStore = Mockito.spy(
+ new KeyValueStoreBuilder<>(
+ new InMemoryKeyValueBytesStoreSupplier("outer"),
+ new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
+ new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()),
+ new MockTime()
+ ).build());
+
+ final GenericInMemoryKeyValueStore<String, String> rootStore = new GenericInMemoryKeyValueStore<>("root");
+
+ otherStore.init((StateStoreContext) procCtx, rootStore);
+ procCtx.addStateStore(otherStore);
+
+ outerStore.init((StateStoreContext) procCtx, rootStore);
+ procCtx.addStateStore(outerStore);
+
+ joinProcessor.init(procCtx);
+
+ final Record<String, String> record1 = new Record<>("key1", "value1", 10000L);
+ final Record<String, String> record2 = new Record<>("key2", "value2", 13000L);
+ final Record<String, String> record3 = new Record<>("key3", "value3", 15000L);
+ final Record<String, String> record4 = new Record<>("key4", "value4", 17000L);
+
+ procCtx.setSystemTimeMs(1000L);
+ joinProcessor.process(record1);
+
+ procCtx.setSystemTimeMs(2100L);
+ joinProcessor.process(record2);
+
+ procCtx.setSystemTimeMs(2500L);
+ joinProcessor.process(record3);
+ // being throttled, so the older value still exists
+ assertEquals(2, iteratorToList(outerStore.all()).size());
+
+ procCtx.setSystemTimeMs(4000L);
+ joinProcessor.process(record4);
+ assertEquals(1, iteratorToList(outerStore.all()).size());
+ }
+
+ private <T> List<T> iteratorToList(final Iterator<T> iterator) {
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+ .collect(Collectors.toList());
+ }
+
private void runJoin(final StreamJoined<String, Integer, Integer> streamJoined,
final JoinWindows joinWindows) {
final StreamsBuilder builder = new StreamsBuilder();
@@ -1808,7 +1903,7 @@ public class KStreamKStreamJoinTest {
" <-- second-join-this-join, second-join-other-join\n" +
" Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
" <-- second-join-merge\n\n";
-
+
private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +