You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/12/28 20:33:15 UTC

[kafka] branch trunk updated: KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c6c6bfa2bd KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
9c6c6bfa2bd is described below

commit 9c6c6bfa2bdf97d546ebb7721bc19d15c955b746
Author: Qing <l....@gmail.com>
AuthorDate: Wed Dec 28 20:32:54 2022 +0000

    KAFKA-13817 Always sync nextTimeToEmit with wall clock (#12166)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Hao Li <hl...@confluent.io>
---
 .../kstream/internals/KStreamKStreamJoin.java      |  7 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  | 97 +++++++++++++++++++++-
 2 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 397a86d24ad..d1cfa25e191 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -205,9 +205,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
             if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
                 return;
             }
-            if (sharedTimeTracker.nextTimeToEmit == 0) {
-                sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
-            }
+
+            // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
+            // they can get out of sync during a clock drift
+            sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
             sharedTimeTracker.advanceNextTimeToEmit();
 
             // reset to MAX_VALUE in case the store is empty
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 6864396d433..a5146a93333 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -31,17 +32,33 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.GenericInMemoryKeyValueStore;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -49,8 +66,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static java.time.Duration.ofHours;
 import static java.time.Duration.ofMillis;
@@ -333,6 +357,77 @@ public class KStreamKStreamJoinTest {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {
+        /**
+         * This test is testing something internal to [[KStreamKStreamJoin]], so we had to setup low-level api manually.
+         */
+        final KStreamImplJoin.TimeTracker tracker = new KStreamImplJoin.TimeTracker();
+        final KStreamKStreamJoin<String, String, String, String> join = new KStreamKStreamJoin<>(
+                false,
+                "other",
+                new JoinWindowsInternal(JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(1000))),
+                (key, v1, v2) -> v1 + v2,
+                true,
+                Optional.of("outer"),
+                tracker);
+        final Processor<String, String, String, String> joinProcessor = join.get();
+        final MockInternalNewProcessorContext<String, String> procCtx = new MockInternalNewProcessorContext<>();
+        final WindowStore<String, String> otherStore = new WindowStoreBuilder<>(
+                new InMemoryWindowBytesStoreSupplier(
+                        "other",
+                        1000L,
+                        100,
+                        false),
+                Serdes.String(),
+                Serdes.String(),
+                new MockTime()).build();
+
+        final KeyValueStore<TimestampedKeyAndJoinSide<String>, LeftOrRightValue<String, String>> outerStore = Mockito.spy(
+                new KeyValueStoreBuilder<>(
+                    new InMemoryKeyValueBytesStoreSupplier("outer"),
+                    new TimestampedKeyAndJoinSideSerde<>(Serdes.String()),
+                    new LeftOrRightValueSerde<>(Serdes.String(), Serdes.String()),
+                    new MockTime()
+                ).build());
+
+        final GenericInMemoryKeyValueStore<String, String> rootStore = new GenericInMemoryKeyValueStore<>("root");
+
+        otherStore.init((StateStoreContext) procCtx, rootStore);
+        procCtx.addStateStore(otherStore);
+
+        outerStore.init((StateStoreContext) procCtx, rootStore);
+        procCtx.addStateStore(outerStore);
+
+        joinProcessor.init(procCtx);
+
+        final Record<String, String> record1 = new Record<>("key1", "value1", 10000L);
+        final Record<String, String> record2 = new Record<>("key2", "value2", 13000L);
+        final Record<String, String> record3 = new Record<>("key3", "value3", 15000L);
+        final Record<String, String> record4 = new Record<>("key4", "value4", 17000L);
+
+        procCtx.setSystemTimeMs(1000L);
+        joinProcessor.process(record1);
+
+        procCtx.setSystemTimeMs(2100L);
+        joinProcessor.process(record2);
+
+        procCtx.setSystemTimeMs(2500L);
+        joinProcessor.process(record3);
+        // being throttled, so the older value still exists
+        assertEquals(2, iteratorToList(outerStore.all()).size());
+
+        procCtx.setSystemTimeMs(4000L);
+        joinProcessor.process(record4);
+        assertEquals(1, iteratorToList(outerStore.all()).size());
+    }
+
+    private <T> List<T> iteratorToList(final Iterator<T> iterator) {
+        return StreamSupport.stream(
+                        Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
+                .collect(Collectors.toList());
+    }
+
     private void runJoin(final StreamJoined<String, Integer, Integer> streamJoined,
                          final JoinWindows joinWindows) {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -1808,7 +1903,7 @@ public class KStreamKStreamJoinTest {
             "      <-- second-join-this-join, second-join-other-join\n" +
             "    Sink: KSTREAM-SINK-0000000021 (topic: out-two)\n" +
             "      <-- second-join-merge\n\n";
-    
+
     private final String expectedTopologyWithGeneratedRepartitionTopic = "Topologies:\n" +
             "   Sub-topology: 0\n" +
             "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n" +