You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/04 23:57:21 UTC

kafka git commit: HOTFIX: set timestamp in SinkNode

Repository: kafka
Updated Branches:
  refs/heads/trunk 45c585b4f -> 31e263e82


HOTFIX: set timestamp in SinkNode

guozhangwang
Setting the timestamp in produced records in SinkNode. This forces the producer record's timestamp same as the context's timestamp.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1137 from ymatsuda/set_timestamp_in_sinknode


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/31e263e8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/31e263e8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/31e263e8

Branch: refs/heads/trunk
Commit: 31e263e8294e94de3d2c44d2ab3a827ab904e247
Parents: 45c585b
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Apr 4 14:57:15 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 4 14:57:15 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/SinkNode.java     | 2 +-
 streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/31e263e8/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index ffc72fd..31a558b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -57,7 +57,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public void process(K key, V value) {
         // send to all the registered topics
         RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer, partitioner);
+        collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/31e263e8/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05713c1..0c56c26 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,6 +57,7 @@ public class KStreamTestDriver {
         this.topology = builder.build("X", null);
         this.stateDir = stateDir;
         this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
+        this.context.setTime(0L);
 
         for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();