You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/02/24 04:44:06 UTC

[kafka] branch 2.8 updated: KAFKA-12323: Set timestamp in record context when punctuate (#10170)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 14f2d22  KAFKA-12323: Set timestamp in record context when punctuate (#10170)
14f2d22 is described below

commit 14f2d22ef74d65ea8b2843ab63566e4b70a34d2f
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 23 20:41:02 2021 -0800

    KAFKA-12323: Set timestamp in record context when punctuate (#10170)
    
    We need to preserve the timestamp when punctuating so that downstream operators would retain it via context.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 29 +++++---
 .../processor/internals/StreamThreadTest.java      | 85 +++++++++++++++++++++-
 2 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 02f2d9f..a2e103c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -697,17 +698,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(
-                currNode,
-                wallClockTime,
-                new ProcessorRecordContext(
-                    record.timestamp,
-                    record.offset(),
-                    record.partition(),
-                    record.topic(),
-                    record.headers()
-                )
+            final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+                record.timestamp,
+                record.offset(),
+                record.partition(),
+                record.topic(),
+                record.headers()
             );
+            updateProcessorContext(currNode, wallClockTime, recordContext);
 
             maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
             final Record<Object, Object> toProcess = new Record<>(
@@ -801,7 +799,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
             throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
-        updateProcessorContext(node, time.milliseconds(), null);
+        // when punctuating, we need to preserve the timestamp (this can be either system time or event time)
+        // while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header
+        final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+            timestamp,
+            -1L,
+            -1,
+            null,
+            new RecordHeaders()
+        );
+        updateProcessorContext(node, time.milliseconds(), recordContext);
 
         if (log.isTraceEnabled()) {
             log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index bfc32d5..e2b1549 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -1805,7 +1806,7 @@ public class StreamThreadTest {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
         final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor =
-            () -> new org.apache.kafka.streams.processor.Processor<Object, Object>() {
+            () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
                 @Override
                 public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
                     context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
@@ -1814,9 +1815,6 @@ public class StreamThreadTest {
 
                 @Override
                 public void process(final Object key, final Object value) {}
-
-                @Override
-                public void close() {}
             };
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
@@ -1875,6 +1873,85 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
+        final org.apache.kafka.streams.kstream.TransformerSupplier<Object, Object, KeyValue<Object, Object>> punctuateProcessor =
+            () -> new org.apache.kafka.streams.kstream.Transformer<Object, Object, KeyValue<Object, Object>>() {
+                @Override
+                public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+                    context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value"));
+                    context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value"));
+                }
+
+                @Override
+                public KeyValue<Object, Object> transform(final Object key, final Object value) {
+                        return null;
+                    }
+
+                @Override
+                public void close() {}
+            };
+
+        final List<Long> peekedContextTime = new ArrayList<>();
+        final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> peekProcessor =
+            () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
+                @Override
+                public void process(final Object key, final Object value) {
+                    peekedContextTime.add(context.timestamp());
+                }
+            };
+
+        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+            .transform(punctuateProcessor)
+            .process(peekProcessor);
+        internalStreamsBuilder.buildAndOptimizeTopology();
+
+        final long currTime = mockTime.milliseconds();
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+        final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+        // assign single partition
+        assignedPartitions.add(t1p1);
+        activeTasks.put(task1, Collections.singleton(t1p1));
+
+        thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+        clientSupplier.consumer.assign(assignedPartitions);
+        clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+        thread.runOnce();
+        assertEquals(0, peekedContextTime.size());
+
+        mockTime.sleep(100L);
+        thread.runOnce();
+
+        assertEquals(1, peekedContextTime.size());
+        assertEquals(currTime + 100L, peekedContextTime.get(0).longValue());
+
+        clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+            topic1,
+            1,
+            0L,
+             100L,
+             TimestampType.CREATE_TIME,
+             ConsumerRecord.NULL_CHECKSUM,
+             "K".getBytes().length,
+             "V".getBytes().length,
+             "K".getBytes(),
+             "V".getBytes()));
+
+        thread.runOnce();
+
+        assertEquals(2, peekedContextTime.size());
+        assertEquals(0L, peekedContextTime.get(1).longValue());
+    }
+
+    @Test
     public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
         final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
         ThreadMetadata metadata = thread.threadMetadata();