You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/09 03:24:36 UTC

[kafka] branch trunk updated: KAFKA-8065: restore original input record timestamp in forward() (#6393)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 09f1009  KAFKA-8065: restore original input record timestamp in forward() (#6393)
09f1009 is described below

commit 09f1009d246b82475bbf018d187fff5a3e035539
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 19:24:26 2019 -0800

    KAFKA-8065: restore original input record timestamp in forward() (#6393)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  | 12 +++--
 .../processor/internals/ProcessorTopologyTest.java | 55 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 2afd5e9..c10ea09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -154,12 +154,15 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     public <K, V> void forward(final K key,
                                final V value,
                                final To to) {
-        toInternal.update(to);
-        if (toInternal.hasTimestamp()) {
-            recordContext.setTimestamp(toInternal.timestamp());
-        }
         final ProcessorNode previousNode = currentNode();
+        final long currentTimestamp = recordContext.timestamp;
+
         try {
+            toInternal.update(to);
+            if (toInternal.hasTimestamp()) {
+                recordContext.setTimestamp(toInternal.timestamp());
+            }
+
             final String sendTo = toInternal.child();
             if (sendTo == null) {
                 final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
@@ -175,6 +178,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                 forward(child, key, value);
             }
         } finally {
+            recordContext.timestamp = currentTimestamp;
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1e3fad3..76252c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -349,6 +349,32 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
+        final int partition = 10;
+        driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 12L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 22L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 25L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 22L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 32L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+    }
+
+    @Test
     public void shouldConsiderHeaders() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), props);
@@ -489,6 +515,16 @@ public class ProcessorTopologyTest {
             .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
+    private Topology createMultiProcessorTimestampTopology(final int partition) {
+        return topology
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new FanOutTimestampProcessor("child1", "child2")), "source")
+            .addProcessor("child1", define(new ForwardingProcessor()), "processor")
+            .addProcessor("child2", define(new TimestampProcessor()), "processor")
+            .addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(partition), "child1")
+            .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
+    }
+
     private Topology createMultiplexingTopology() {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@@ -582,6 +618,25 @@ public class ProcessorTopologyTest {
         }
     }
 
+    protected static class FanOutTimestampProcessor extends AbstractProcessor<String, String> {
+        private final String firstChild;
+        private final String secondChild;
+
+        FanOutTimestampProcessor(final String firstChild,
+                                 final String secondChild) {
+            this.firstChild = firstChild;
+            this.secondChild = secondChild;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            context().forward(key, value);
+            context().forward(key, value, To.child(firstChild).withTimestamp(context().timestamp() + 5));
+            context().forward(key, value, To.child(secondChild));
+            context().forward(key, value, To.all().withTimestamp(context().timestamp() + 2));
+        }
+    }
+
     protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
         @Override
         public void process(final String key, final String value) {