You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/02/24 04:42:30 UTC
[kafka] branch trunk updated: KAFKA-12323: Set timestamp in record
context when punctuate (#10170)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f75efb9 KAFKA-12323: Set timestamp in record context when punctuate (#10170)
f75efb9 is described below
commit f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 23 20:41:02 2021 -0800
KAFKA-12323: Set timestamp in record context when punctuate (#10170)
We need to preserve the timestamp when punctuating so that downstream operators would retain it via context.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 29 +++++---
.../processor/internals/StreamThreadTest.java | 85 +++++++++++++++++++++-
2 files changed, 99 insertions(+), 15 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 36dc02a..aefa3ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -688,17 +689,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
log.trace("Start processing one record [{}]", record);
- updateProcessorContext(
- currNode,
- wallClockTime,
- new ProcessorRecordContext(
- record.timestamp,
- record.offset(),
- record.partition(),
- record.topic(),
- record.headers()
- )
+ final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+ record.timestamp,
+ record.offset(),
+ record.partition(),
+ record.topic(),
+ record.headers()
);
+ updateProcessorContext(currNode, wallClockTime, recordContext);
maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name());
final Record<Object, Object> toProcess = new Record<>(
@@ -792,7 +790,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
}
- updateProcessorContext(node, time.milliseconds(), null);
+ // when punctuating, we need to preserve the timestamp (this can be either system time or event time)
+ // while other record context are set as dummy: null topic, -1 partition, -1 offset and empty header
+ final ProcessorRecordContext recordContext = new ProcessorRecordContext(
+ timestamp,
+ -1L,
+ -1,
+ null,
+ new RecordHeaders()
+ );
+ updateProcessorContext(node, time.milliseconds(), recordContext);
if (log.isTraceEnabled()) {
log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index bfc32d5..e2b1549 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
@@ -1805,7 +1806,7 @@ public class StreamThreadTest {
final List<Long> punctuatedStreamTime = new ArrayList<>();
final List<Long> punctuatedWallClockTime = new ArrayList<>();
final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor =
- () -> new org.apache.kafka.streams.processor.Processor<Object, Object>() {
+ () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
@Override
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
@@ -1814,9 +1815,6 @@ public class StreamThreadTest {
@Override
public void process(final Object key, final Object value) {}
-
- @Override
- public void close() {}
};
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
@@ -1875,6 +1873,85 @@ public class StreamThreadTest {
}
@Test
+ public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
+ final org.apache.kafka.streams.kstream.TransformerSupplier<Object, Object, KeyValue<Object, Object>> punctuateProcessor =
+ () -> new org.apache.kafka.streams.kstream.Transformer<Object, Object, KeyValue<Object, Object>>() {
+ @Override
+ public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+ context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value"));
+ context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value"));
+ }
+
+ @Override
+ public KeyValue<Object, Object> transform(final Object key, final Object value) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ final List<Long> peekedContextTime = new ArrayList<>();
+ final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> peekProcessor =
+ () -> new org.apache.kafka.streams.processor.AbstractProcessor<Object, Object>() {
+ @Override
+ public void process(final Object key, final Object value) {
+ peekedContextTime.add(context.timestamp());
+ }
+ };
+
+ internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+ .transform(punctuateProcessor)
+ .process(peekProcessor);
+ internalStreamsBuilder.buildAndOptimizeTopology();
+
+ final long currTime = mockTime.milliseconds();
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
+
+ thread.setState(StreamThread.State.STARTING);
+ thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+ final List<TopicPartition> assignedPartitions = new ArrayList<>();
+
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+
+ // assign single partition
+ assignedPartitions.add(t1p1);
+ activeTasks.put(task1, Collections.singleton(t1p1));
+
+ thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+ clientSupplier.consumer.assign(assignedPartitions);
+ clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+ thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+ thread.runOnce();
+ assertEquals(0, peekedContextTime.size());
+
+ mockTime.sleep(100L);
+ thread.runOnce();
+
+ assertEquals(1, peekedContextTime.size());
+ assertEquals(currTime + 100L, peekedContextTime.get(0).longValue());
+
+ clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+ topic1,
+ 1,
+ 0L,
+ 100L,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ "K".getBytes().length,
+ "V".getBytes().length,
+ "K".getBytes(),
+ "V".getBytes()));
+
+ thread.runOnce();
+
+ assertEquals(2, peekedContextTime.size());
+ assertEquals(0L, peekedContextTime.get(1).longValue());
+ }
+
+ @Test
public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
final StreamThread thread = createStreamThread(CLIENT_ID, config, false);
ThreadMetadata metadata = thread.threadMetadata();