You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/09 03:24:36 UTC
[kafka] branch trunk updated: KAFKA-8065: restore original input
record timestamp in forward() (#6393)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 09f1009 KAFKA-8065: restore original input record timestamp in forward() (#6393)
09f1009 is described below
commit 09f1009d246b82475bbf018d187fff5a3e035539
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 19:24:26 2019 -0800
KAFKA-8065: restore original input record timestamp in forward() (#6393)
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/ProcessorContextImpl.java | 12 +++--
.../processor/internals/ProcessorTopologyTest.java | 55 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 2afd5e9..c10ea09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -154,12 +154,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
public <K, V> void forward(final K key,
final V value,
final To to) {
- toInternal.update(to);
- if (toInternal.hasTimestamp()) {
- recordContext.setTimestamp(toInternal.timestamp());
- }
final ProcessorNode previousNode = currentNode();
+ final long currentTimestamp = recordContext.timestamp;
+
try {
+ toInternal.update(to);
+ if (toInternal.hasTimestamp()) {
+ recordContext.setTimestamp(toInternal.timestamp());
+ }
+
final String sendTo = toInternal.child();
if (sendTo == null) {
final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
@@ -175,6 +178,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
forward(child, key, value);
}
} finally {
+ recordContext.timestamp = currentTimestamp;
setCurrentNode(previousNode);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1e3fad3..76252c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -349,6 +349,32 @@ public class ProcessorTopologyTest {
}
@Test
+ public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
+ final int partition = 10;
+ driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props);
+
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 12L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 22L);
+ assertNoOutputRecord(OUTPUT_TOPIC_1);
+ assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 25L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 22L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 32L);
+ assertNoOutputRecord(OUTPUT_TOPIC_1);
+ assertNoOutputRecord(OUTPUT_TOPIC_2);
+ }
+
+ @Test
public void shouldConsiderHeaders() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition), props);
@@ -489,6 +515,16 @@ public class ProcessorTopologyTest {
.addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
}
+ private Topology createMultiProcessorTimestampTopology(final int partition) {
+ return topology
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new FanOutTimestampProcessor("child1", "child2")), "source")
+ .addProcessor("child1", define(new ForwardingProcessor()), "processor")
+ .addProcessor("child2", define(new TimestampProcessor()), "processor")
+ .addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(partition), "child1")
+ .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
+ }
+
private Topology createMultiplexingTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@@ -582,6 +618,25 @@ public class ProcessorTopologyTest {
}
}
+ protected static class FanOutTimestampProcessor extends AbstractProcessor<String, String> {
+ private final String firstChild;
+ private final String secondChild;
+
+ FanOutTimestampProcessor(final String firstChild,
+ final String secondChild) {
+ this.firstChild = firstChild;
+ this.secondChild = secondChild;
+ }
+
+ @Override
+ public void process(final String key, final String value) {
+ context().forward(key, value);
+ context().forward(key, value, To.child(firstChild).withTimestamp(context().timestamp() + 5));
+ context().forward(key, value, To.child(secondChild));
+ context().forward(key, value, To.all().withTimestamp(context().timestamp() + 2));
+ }
+ }
+
protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
@Override
public void process(final String key, final String value) {