You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/17 18:25:54 UTC

kafka git commit: KAFKA-3716; Validate all timestamps are not negative

Repository: kafka
Updated Branches:
  refs/heads/trunk d54616bc3 -> 53fd22a76


KAFKA-3716; Validate all timestamps are not negative

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Eno Thereska, Ismael Juma

Closes #1393 from guozhangwang/K3716-check-non-negative-timestamps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53fd22a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53fd22a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53fd22a7

Branch: refs/heads/trunk
Commit: 53fd22a76613b309b7941a5b0c64f17523b39202
Parents: d54616b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue May 17 11:25:49 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 17 11:25:49 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/streams/kstream/Windows.java   | 6 +-----
 .../apache/kafka/streams/processor/internals/RecordQueue.java | 5 +++++
 .../apache/kafka/streams/processor/internals/StreamTask.java  | 7 ++++++-
 3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 06cacb4..c64a80f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,9 +17,7 @@
 
 package org.apache.kafka.streams.kstream;
 
-
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The window specification interface that can be extended for windowing operation in joins and aggregations.
@@ -32,8 +30,6 @@ public abstract class Windows<W extends Window> {
 
     private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L;   // one day
 
-    private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
-
     protected String name;
 
     private long maintainDurationMs;
@@ -86,7 +82,7 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Creates all windows that contain the provided timestamp.
+     * Creates all windows that contain the provided timestamp, indexed by non-negative window start timestamps.
      *
      * @param timestamp  the timestamp window should get created for
      * @return  a map of {@code windowStartTimestamp -> Window} entries

http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 6911a45..7e5baf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.ArrayDeque;
@@ -84,6 +85,10 @@ public class RecordQueue {
                                                                          rawRecord.serializedValueSize(), key, value);
             long timestamp = timestampExtractor.extract(record);
 
+            // validate that timestamp must be non-negative
+            if (timestamp < 0)
+                throw new StreamsException("Extracted timestamp value is negative, which is not allowed.");
+
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);
 
             fifoQueue.addLast(stampedRecord);

http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d9efb6d..e7e24fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -209,7 +209,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
     public boolean maybePunctuate() {
         long timestamp = partitionGroup.timestamp();
 
-        return punctuationQueue.mayPunctuate(timestamp, this);
+        // if the timestamp is not known yet, meaning there is not enough data accumulated
+        // to reason stream partition time, then skip.
+        if (timestamp == TimestampTracker.NOT_KNOWN)
+            return false;
+        else
+            return punctuationQueue.mayPunctuate(timestamp, this);
     }
 
     /**