You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/03/09 03:45:06 UTC

[kafka] branch 2.1 updated: KAFKA-8065: restore original input record timestamp in forward() (#6393)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 8699561  KAFKA-8065: restore original input record timestamp in forward() (#6393)
8699561 is described below

commit 8699561e740480a1213f88bed7eb59cb23de29c5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Mar 8 19:24:26 2019 -0800

    KAFKA-8065: restore original input record timestamp in forward() (#6393)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  | 16 +++++--
 .../processor/internals/ProcessorTopologyTest.java | 55 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 0ed82de..fa78d01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -106,13 +106,18 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
-        toInternal.update(to);
-        if (toInternal.hasTimestamp()) {
-            recordContext.setTimestamp(toInternal.timestamp());
-        }
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final To to) {
         final ProcessorNode previousNode = currentNode();
+        final long currentTimestamp = recordContext.timestamp;
+
         try {
+            toInternal.update(to);
+            if (toInternal.hasTimestamp()) {
+                recordContext.setTimestamp(toInternal.timestamp());
+            }
+
             final List<ProcessorNode<K, V>> children = (List<ProcessorNode<K, V>>) currentNode().children();
             final String sendTo = toInternal.child();
             if (sendTo != null) {
@@ -133,6 +138,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                 }
             }
         } finally {
+            recordContext.timestamp = currentTimestamp;
             setCurrentNode(previousNode);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 587cae2..b27d2b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -345,6 +345,32 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
+        final int partition = 10;
+        driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 12L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 22L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key2", "value2", 20L));
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 20L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 25L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 30L);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition, 22L);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition, 32L);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+    }
+
+    @Test
     public void shouldConsiderHeaders() {
         final int partition = 10;
         driver = new TopologyTestDriver(createSimpleTopology(partition), props);
@@ -438,6 +464,16 @@ public class ProcessorTopologyTest {
             .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
+    private Topology createMultiProcessorTimestampTopology(final int partition) {
+        return topology
+            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+            .addProcessor("processor", define(new FanOutTimestampProcessor("child1", "child2")), "source")
+            .addProcessor("child1", define(new ForwardingProcessor()), "processor")
+            .addProcessor("child2", define(new TimestampProcessor()), "processor")
+            .addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(partition), "child1")
+            .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
+    }
+
     private Topology createMultiplexingTopology() {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@@ -531,6 +567,25 @@ public class ProcessorTopologyTest {
         }
     }
 
+    protected static class FanOutTimestampProcessor extends AbstractProcessor<String, String> {
+        private final String firstChild;
+        private final String secondChild;
+
+        FanOutTimestampProcessor(final String firstChild,
+                                 final String secondChild) {
+            this.firstChild = firstChild;
+            this.secondChild = secondChild;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            context().forward(key, value);
+            context().forward(key, value, To.child(firstChild).withTimestamp(context().timestamp() + 5));
+            context().forward(key, value, To.child(secondChild));
+            context().forward(key, value, To.all().withTimestamp(context().timestamp() + 2));
+        }
+    }
+
     protected static class AddHeaderProcessor extends AbstractProcessor<String, String> {
         @Override
         public void process(final String key, final String value) {