You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/11 08:09:48 UTC
kafka git commit: HOTFIX: special handling first ever triggered
punctuate
Repository: kafka
Updated Branches:
refs/heads/trunk 7c2798986 -> c76b6e6d9
HOTFIX: special handling first ever triggered punctuate
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Anna Povzner <an...@confluent.io>
Closes #1208 from guozhangwang/KPunctuate
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c76b6e6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c76b6e6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c76b6e6d
Branch: refs/heads/trunk
Commit: c76b6e6d9bad2278076054f5175a2b053383388f
Parents: 7c27989
Author: Guozhang Wang <wa...@gmail.com>
Authored: Sun Apr 10 23:09:43 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Apr 10 23:09:43 2016 -0700
----------------------------------------------------------------------
.../streams/processor/internals/PunctuationQueue.java | 2 +-
.../processor/internals/PunctuationSchedule.java | 13 +++++++++----
.../streams/processor/internals/StreamTaskTest.java | 10 +++++-----
3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@ public class PunctuationQueue {
PunctuationSchedule sched = top;
pq.poll();
punctuator.punctuate(sched.node(), timestamp);
- pq.add(sched.next());
+ pq.add(sched.next(timestamp));
punctuated = true;
top = pq.peek();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 758cfb0..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
final long interval;
public PunctuationSchedule(ProcessorNode node, long interval) {
- this(node, 0, interval);
+ this(node, 0L, interval);
}
public PunctuationSchedule(ProcessorNode node, long time, long interval) {
- super(node, time + interval);
+ super(node, time);
this.interval = interval;
}
@@ -34,8 +34,13 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
return value;
}
- public PunctuationSchedule next() {
- return new PunctuationSchedule(value, timestamp, interval);
+ public PunctuationSchedule next(long currTimestamp) {
+ // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+ // is equal to the interval) by reschedule based on the currTimestamp
+ if (timestamp == 0L)
+ return new PunctuationSchedule(value, currTimestamp + interval, interval);
+ else
+ return new PunctuationSchedule(value, timestamp + interval, interval);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index dd48947..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -226,15 +226,15 @@ public class StreamTaskTest {
StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertTrue(task.maybePunctuate());
@@ -275,7 +275,7 @@ public class StreamTaskTest {
assertFalse(task.maybePunctuate());
- processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
+ processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
task.close();