You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/11 08:09:48 UTC

kafka git commit: HOTFIX: special handling first ever triggered punctuate

Repository: kafka
Updated Branches:
  refs/heads/trunk 7c2798986 -> c76b6e6d9


HOTFIX: special handling first ever triggered punctuate

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Anna Povzner <an...@confluent.io>

Closes #1208 from guozhangwang/KPunctuate


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c76b6e6d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c76b6e6d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c76b6e6d

Branch: refs/heads/trunk
Commit: c76b6e6d9bad2278076054f5175a2b053383388f
Parents: 7c27989
Author: Guozhang Wang <wa...@gmail.com>
Authored: Sun Apr 10 23:09:43 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Apr 10 23:09:43 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/PunctuationQueue.java  |  2 +-
 .../processor/internals/PunctuationSchedule.java       | 13 +++++++++----
 .../streams/processor/internals/StreamTaskTest.java    | 10 +++++-----
 3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@ public class PunctuationQueue {
                 PunctuationSchedule sched = top;
                 pq.poll();
                 punctuator.punctuate(sched.node(), timestamp);
-                pq.add(sched.next());
+                pq.add(sched.next(timestamp));
                 punctuated = true;
 
                 top = pq.peek();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index 758cfb0..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
     final long interval;
 
     public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, 0, interval);
+        this(node, 0L, interval);
     }
 
     public PunctuationSchedule(ProcessorNode node, long time, long interval) {
-        super(node, time + interval);
+        super(node, time);
         this.interval = interval;
     }
 
@@ -34,8 +34,13 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
         return value;
     }
 
-    public PunctuationSchedule next() {
-        return new PunctuationSchedule(value, timestamp, interval);
+    public PunctuationSchedule next(long currTimestamp) {
+        // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+        // is equal to the interval) by reschedule based on the currTimestamp
+        if (timestamp == 0L)
+            return new PunctuationSchedule(value, currTimestamp + interval, interval);
+        else
+            return new PunctuationSchedule(value, timestamp + interval, interval);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c76b6e6d/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index dd48947..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -226,15 +226,15 @@ public class StreamTaskTest {
             StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                     new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
             ));
 
             assertTrue(task.maybePunctuate());
@@ -275,7 +275,7 @@ public class StreamTaskTest {
 
             assertFalse(task.maybePunctuate());
 
-            processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
+            processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
 
             task.close();