You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/09 03:45:06 UTC
[kafka] branch 2.1 updated: KAFKA-8065: restore original input
record timestamp in forward() (#6393)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 8699561 KAFKA-8065: restore original input record timestamp in forward() (#6393)
8699561 is described below
commit 8699561e740480a1213f88bed7eb59cb23de29c5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 19:24:26 2019 -0800
KAFKA-8065: restore original input record timestamp in forward() (#6393)
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/ProcessorContextImpl.java | 16 +++++--
.../processor/internals/ProcessorTopologyTest.java | 55 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 0ed82de..fa78d01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -106,13 +106,18 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@SuppressWarnings("unchecked")
@Override
- public <K, V> void forward(final K key, final V value, final To to) {
- toInternal.update(to);
- if (toInternal.hasTimestamp()) {
- recordContext.setTimestamp(toInternal.timestamp());
- }
+ public <K, V> void forward(final K key,
+ final V value,
+ final To to) {
final ProcessorNode previousNode = currentNode();
+ final long currentTimestamp = recordContext.timestamp;
+
try {
+ toInternal.update(to);
+ if (toInternal.hasTimestamp()) {
+ recordContext.setTimestamp(toInternal.timestamp());
+ }
+
final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
final String sendTo = toInternal.child();
if (sendTo != null) {
@@ -133,6 +138,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
} finally {
+ recordContext.timestamp = currentTimestamp;
setCurrentNode(previousNode);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 587cae2..b27d2b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -345,6 +345,32 @@ public class ProcessorTopologyTest {
}
@Test
+ public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
+ final int partition = 10;
+ driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props);
+
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 12L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 22L);
+ assertNoOutputRecord(OUTPUT_TOPIC_1);
+ assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+ driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 25L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 22L);
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 32L);
+ assertNoOutputRecord(OUTPUT_TOPIC_1);
+ assertNoOutputRecord(OUTPUT_TOPIC_2);
+ }
+
+ @Test
public void shouldConsiderHeaders() {
final int partition = 10;
driver = new TopologyTestDriver(createSimpleTopology(partition), props);
@@ -438,6 +464,16 @@ public class ProcessorTopologyTest {
.addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
}
+ private Topology createMultiProcessorTimestampTopology(final int partition) {
+ return topology
+ .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+ .addProcessor("processor", define(new FanOutTimestampProcessor("child1", "child2")), "source")
+ .addProcessor("child1", define(new ForwardingProcessor()), "processor")
+ .addProcessor("child2", define(new TimestampProcessor()), "processor")
+ .addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(partition), "child1")
+ .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
+ }
+
private Topology createMultiplexingTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@@ -531,6 +567,25 @@ public class ProcessorTopologyTest {
}
}
+ protected static class FanOutTimestampProcessor extends AbstractProcessor<String, String> {
+ private final String firstChild;
+ private final String secondChild;
+
+ FanOutTimestampProcessor(final String firstChild,
+ final String secondChild) {
+ this.firstChild = firstChild;
+ this.secondChild = secondChild;
+ }
+
+ @Override
+ public void process(final String key, final String value) {
+ context().forward(key, value);
+ context().forward(key, value, To.child(firstChild).withTimestamp(context().timestamp() + 5));
+ context().forward(key, value, To.child(secondChild));
+ context().forward(key, value, To.all().withTimestamp(context().timestamp() + 2));
+ }
+ }
+
protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
@Override
public void process(final String key, final String value) {