You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/04 23:57:21 UTC
kafka git commit: HOTFIX: set timestamp in SinkNode
Repository: kafka
Updated Branches:
refs/heads/trunk 45c585b4f -> 31e263e82
HOTFIX: set timestamp in SinkNode
guozhangwang
Setting the timestamp in produced records in SinkNode. This forces the producer record's timestamp same as the context's timestamp.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1137 from ymatsuda/set_timestamp_in_sinknode
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/31e263e8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/31e263e8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/31e263e8
Branch: refs/heads/trunk
Commit: 31e263e8294e94de3d2c44d2ab3a827ab904e247
Parents: 45c585b
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Apr 4 14:57:15 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 4 14:57:15 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/processor/internals/SinkNode.java | 2 +-
streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/31e263e8/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index ffc72fd..31a558b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -57,7 +57,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
public void process(K key, V value) {
// send to all the registered topics
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer, partitioner);
+ collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/31e263e8/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05713c1..0c56c26 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,6 +57,7 @@ public class KStreamTestDriver {
this.topology = builder.build("X", null);
this.stateDir = stateDir;
this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
+ this.context.setTime(0L);
for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
StateStore store = stateStoreSupplier.get();