You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/05/17 18:25:54 UTC
kafka git commit: KAFKA-3716; Validate all timestamps are not negative
Repository: kafka
Updated Branches:
refs/heads/trunk d54616bc3 -> 53fd22a76
KAFKA-3716; Validate all timestamps are not negative
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Eno Thereska, Ismael Juma
Closes #1393 from guozhangwang/K3716-check-non-negative-timestamps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/53fd22a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/53fd22a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/53fd22a7
Branch: refs/heads/trunk
Commit: 53fd22a76613b309b7941a5b0c64f17523b39202
Parents: d54616b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue May 17 11:25:49 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 17 11:25:49 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/kafka/streams/kstream/Windows.java | 6 +-----
.../apache/kafka/streams/processor/internals/RecordQueue.java | 5 +++++
.../apache/kafka/streams/processor/internals/StreamTask.java | 7 ++++++-
3 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 06cacb4..c64a80f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -17,9 +17,7 @@
package org.apache.kafka.streams.kstream;
-
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* The window specification interface that can be extended for windowing operation in joins and aggregations.
@@ -32,8 +30,6 @@ public abstract class Windows<W extends Window> {
private static final long DEFAULT_MAINTAIN_DURATION = 24 * 60 * 60 * 1000L; // one day
- private static final AtomicInteger NAME_INDEX = new AtomicInteger(0);
-
protected String name;
private long maintainDurationMs;
@@ -86,7 +82,7 @@ public abstract class Windows<W extends Window> {
}
/**
- * Creates all windows that contain the provided timestamp.
+ * Creates all windows that contain the provided timestamp, indexed by non-negative window start timestamps.
*
* @param timestamp the timestamp window should get created for
* @return a map of {@code windowStartTimestamp -> Window} entries
http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 6911a45..7e5baf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.util.ArrayDeque;
@@ -84,6 +85,10 @@ public class RecordQueue {
rawRecord.serializedValueSize(), key, value);
long timestamp = timestampExtractor.extract(record);
+ // validate that timestamp must be non-negative
+ if (timestamp < 0)
+ throw new StreamsException("Extracted timestamp value is negative, which is not allowed.");
+
StampedRecord stampedRecord = new StampedRecord(record, timestamp);
fifoQueue.addLast(stampedRecord);
http://git-wip-us.apache.org/repos/asf/kafka/blob/53fd22a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d9efb6d..e7e24fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -209,7 +209,12 @@ public class StreamTask extends AbstractTask implements Punctuator {
public boolean maybePunctuate() {
long timestamp = partitionGroup.timestamp();
- return punctuationQueue.mayPunctuate(timestamp, this);
+ // if the timestamp is not known yet, meaning there is not enough data accumulated
+ // to reason stream partition time, then skip.
+ if (timestamp == TimestampTracker.NOT_KNOWN)
+ return false;
+ else
+ return punctuationQueue.mayPunctuate(timestamp, this);
}
/**