You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/10 00:17:39 UTC
kafka git commit: KAFKA-4393: Improve invalid/negative TS handling
Repository: kafka
Updated Branches:
refs/heads/trunk 7f8edbc8e -> 9bed8fbcf
KAFKA-4393: Improve invalid/negative TS handling
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang
Closes #2117 from mjsax/kafka-4393-improveInvalidTsHandling
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bed8fbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bed8fbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bed8fbc
Branch: refs/heads/trunk
Commit: 9bed8fbcfc52ced719f2dcafa3f30cbfd5e6bd57
Parents: 7f8edbc
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Dec 9 16:17:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 9 16:17:36 2016 -0800
----------------------------------------------------------------------
checkstyle/import-control.xml | 2 +-
docs/upgrade.html | 13 ++++
.../pageview/JsonTimestampExtractor.java | 2 +-
.../org/apache/kafka/streams/StreamsConfig.java | 4 +-
.../ConsumerRecordTimestampExtractor.java | 40 ----------
.../ExtractRecordMetadataTimestamp.java | 77 ++++++++++++++++++++
.../processor/FailOnInvalidTimestamp.java | 68 +++++++++++++++++
.../processor/LogAndSkipOnInvalidTimestamp.java | 69 ++++++++++++++++++
.../streams/processor/TimestampExtractor.java | 12 +--
.../UsePreviousTimeOnInvalidTimestamp.java | 70 ++++++++++++++++++
.../processor/WallclockTimestampExtractor.java | 16 ++--
.../processor/internals/RecordQueue.java | 9 ++-
.../streams/processor/internals/SinkNode.java | 5 +-
.../streams/processor/internals/StreamTask.java | 15 ++--
.../processor/internals/StreamThread.java | 8 +-
.../processor/FailOnInvalidTimestampTest.java | 36 +++++++++
.../LogAndSkipOnInvalidTimestampTest.java | 56 ++++++++++++++
.../processor/TimestampExtractorTest.java | 48 ++++++++++++
.../UsePreviousTimeOnInvalidTimestampTest.java | 45 ++++++++++++
.../WallclockTimestampExtractorTest.java | 62 ++++++++++++++++
.../internals/ProcessorTopologyTest.java | 10 +--
.../processor/internals/RecordQueueTest.java | 22 ++++++
.../smoketest/TestTimestampExtractor.java | 2 +-
.../kafka/test/MockTimestampExtractor.java | 2 +-
24 files changed, 617 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 58525ad..8eebdb5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -27,6 +27,7 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
+ <allow pkg="org.hamcrest" />
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<allow pkg="java.security" />
@@ -151,7 +152,6 @@
<allow pkg="scala" />
<allow pkg="scala.collection" />
<allow pkg="org.I0Itec.zkclient" />
- <allow pkg="org.hamcrest" />
</subpackage>
<subpackage name="state">
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c63487d..06b53da 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -26,6 +26,19 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
</ul>
+<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+ <li>Upgrading a Kafka Streams Applications:
+ <ul>
+ <li>You need to recompile your code. Just swapping the jar file will not work and will break your appliation.</li>
+ <li>If you use a custom timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface got changed.</li>
+ </ul>
+ </li>
+</ol>
+
<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 63e8377..918cd65 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class JsonTimestampExtractor implements TimestampExtractor {
@Override
- public long extract(ConsumerRecord<Object, Object> record) {
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
if (record.value() instanceof PageViewTypedDemo.PageView) {
return ((PageViewTypedDemo.PageView) record.value()).timestamp;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5ba4383..53f49ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig {
REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
- ConsumerRecordTimestampExtractor.class.getName(),
+ FailOnInvalidTimestamp.class.getName(),
Importance.MEDIUM,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
deleted file mode 100644
index 0d3424e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/**
- * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
- *
- * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
- * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
- * via this timestamp extractor.
- *
- * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
- * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
- * this extractor effectively provides <i>ingestion-time</i> semantics.
- *
- * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
- */
-public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
- @Override
- public long extract(ConsumerRecord<Object, Object> record) {
- return record.timestamp();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
new file mode 100644
index 0000000..cbe024e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value, an error handler method is called.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
+
+ /**
+ * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
+ *
+ * @param record a data record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
+ */
+ @Override
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
+ final long timestamp = record.timestamp();
+
+ if (timestamp < 0) {
+ return onInvalidTimestamp(record, timestamp, previousTimestamp);
+ }
+
+ return timestamp;
+ }
+
+ /**
+ * Called if no valid timestamp is embedded in the record meta data.
+ *
+ * @param record a data record
+ * @param recordTimestamp the timestamp extractor from the record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
+ */
+ public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+ final long recordTimestamp,
+ final long previousTimestamp);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
new file mode 100644
index 0000000..d7f64a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+
+ /**
+ * Raises an exception on every call.
+ *
+ * @param record a data record
+ * @param recordTimestamp the timestamp extractor from the record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return nothing; always raises an exception
+ * @throws StreamsException on every invocation
+ */
+ @Override
+ public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+ final long recordTimestamp,
+ final long previousTimestamp)
+ throws StreamsException {
+ throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
+ "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
+ "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
+ "Use a different TimestampExtractor to process this data.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
new file mode 100644
index 0000000..f24fd15
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value the timestamp is returned as-is;
+ * in addition, a WARN message is logged in your application.
+ * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+ private static final Logger log = LoggerFactory.getLogger(LogAndSkipOnInvalidTimestamp.class);
+
+ /**
+ * Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the invalid timestamp as-is,
+ * which ultimately causes the record to be skipped and not to be processed.
+ *
+ * @param record a data record
+ * @param recordTimestamp the timestamp extractor from the record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return the originally extracted timestamp of the record
+ */
+ @Override
+ public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+ final long recordTimestamp,
+ final long previousTimestamp) {
+ log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record);
+ return recordTimestamp;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index c55518b..0de96ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -27,17 +27,19 @@ import org.apache.kafka.streams.kstream.KTable;
public interface TimestampExtractor {
/**
- * Extracts a timestamp from a record.
+ * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp.
+ * Returning a negative timestamp will cause the record not to be processed but rather silently skipped.
* <p>
* The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC.
- *
+ * <p>
* It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s
* and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in
* excessive log rolling and therefore broker performance degradation.
*
*
- * @param record a data record
- * @return the timestamp of the record
+ * @param record a data record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return the timestamp of the record
*/
- long extract(ConsumerRecord<Object, Object> record);
+ long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
new file mode 100644
index 0000000..7718b5c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp, a new timestamp will be inferred from the current stream-time.
+ * <p></p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+
+ /**
+ * Returns the current stream-time as new timestamp for the record.
+ *
+ * @param record a data record
+ * @param recordTimestamp the timestamp extractor from the record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return the provided latest extracted valid timestamp as new timestamp for the record
+ * @throws StreamsException if latest extracted valid timestamp is unknown
+ */
+ @Override
+ public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+ final long recordTimestamp,
+ final long previousTimestamp)
+ throws StreamsException {
+ if (previousTimestamp < 0) {
+ throw new StreamsException("Could not infer new timestamp for input record " + record
+ + " because latest extracted valid timestamp is unknown.");
+ }
+ return previousTimestamp;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index 305573b..6df9481 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -21,22 +21,26 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}.
- *
+ * <p>
* Using this extractor effectively provides <i>processing-time</i> semantics.
- *
- * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
+ * <p>
+ * If you need <i>event-time</i> semantics, use {@link FailOnInvalidTimestamp} with
* built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
*/
public class WallclockTimestampExtractor implements TimestampExtractor {
/**
* Return the current wall clock time as timestamp.
*
- * @param record a data record
- * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
+ * @param record a data record
+ * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+ * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/
@Override
- public long extract(ConsumerRecord<Object, Object> record) {
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return System.currentTimeMillis();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 44ef146..a40b9ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -102,13 +102,14 @@ public class RecordQueue {
rawRecord.checksum(),
rawRecord.serializedKeySize(),
rawRecord.serializedValueSize(), key, value);
- long timestamp = timestampExtractor.extract(record);
+ long timestamp = timestampExtractor.extract(record, timeTracker.get());
log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);
- // validate that timestamp must be non-negative
- if (timestamp < 0)
- throw new StreamsException("Extracted timestamp value is negative, which is not allowed.");
+ // drop message if TS is invalid, i.e., negative
+ if (timestamp < 0) {
+ continue;
+ }
StampedRecord stampedRecord = new StampedRecord(record, timestamp);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2f20cdb..e7f32b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -73,10 +73,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
final long timestamp = context.timestamp();
if (timestamp < 0) {
- throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " +
- "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
- "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
- "Use a different TimestampExtractor to process this data.");
+ throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
}
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a40e1be..ae374ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -123,22 +123,27 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
/**
- * Adds records to queues
+ * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
+ * and not added to the queue for processing
*
* @param partition the partition
* @param records the records
+ * @returns the number of added records
*/
@SuppressWarnings("unchecked")
- public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
- int queueSize = partitionGroup.addRawRecords(partition, records);
+ public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
+ final int oldQueueSize = partitionGroup.numBuffered();
+ final int newQueueSize = partitionGroup.addRawRecords(partition, records);
- log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, queueSize);
+ log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);
// if after adding these records, its partition queue's buffered size has been
// increased beyond the threshold, we can then pause the consumption for this partition
- if (queueSize > this.maxBufferedSize) {
+ if (newQueueSize > this.maxBufferedSize) {
consumer.pause(singleton(partition));
}
+
+ return newQueueSize - oldQueueSize;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5e82829..96e9963 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -549,10 +549,12 @@ public class StreamThread extends Thread {
throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
if (!records.isEmpty()) {
+ int numAddedRecords = 0;
for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
- task.addRecords(partition, records.records(partition));
+ numAddedRecords += task.addRecords(partition, records.records(partition));
}
+ sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
polledRecords = true;
} else {
polledRecords = false;
@@ -1020,6 +1022,7 @@ public class StreamThread extends Thread {
final Sensor punctuateTimeSensor;
final Sensor taskCreationSensor;
final Sensor taskDestructionSensor;
+ final Sensor skippedRecordsSensor;
public StreamsMetricsImpl(Metrics metrics) {
this.metrics = metrics;
@@ -1052,6 +1055,9 @@ public class StreamThread extends Thread {
this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+
+ this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records");
+ this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count()));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
new file mode 100644
index 0000000..738e956
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.junit.Test;
+
+public class FailOnInvalidTimestampTest extends TimestampExtractorTest {
+
+ @Test
+ public void extractMetadataTimestamp() {
+ testExtractMetadataTimestamp(new FailOnInvalidTimestamp());
+ }
+
+ @Test(expected = StreamsException.class)
+ public void failOnInvalidTimestamp() {
+ final TimestampExtractor extractor = new FailOnInvalidTimestamp();
+ extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
new file mode 100644
index 0000000..92d8709
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest {
+
+ @Test
+ public void extractMetadataTimestamp() {
+ testExtractMetadataTimestamp(new LogAndSkipOnInvalidTimestamp());
+ }
+
+ @Test
+ public void logAndSkipOnInvalidTimestamp() {
+ final long invalidMetadataTimestamp = -42;
+
+ final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp();
+ final long timestamp = extractor.extract(
+ new ConsumerRecord<>(
+ "anyTopic",
+ 0,
+ 0,
+ invalidMetadataTimestamp,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ 0,
+ 0,
+ 0,
+ null,
+ null),
+ 0
+ );
+
+ assertThat(timestamp, is(invalidMetadataTimestamp));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
new file mode 100644
index 0000000..93e0b5b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class TimestampExtractorTest {
+
+ void testExtractMetadataTimestamp(TimestampExtractor extractor) {
+ final long metadataTimestamp = 42;
+
+ final long timestamp = extractor.extract(
+ new ConsumerRecord<>(
+ "anyTopic",
+ 0,
+ 0,
+ metadataTimestamp,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ 0,
+ 0,
+ 0,
+ null,
+ null),
+ 0
+ );
+
+ assertThat(timestamp, is(metadataTimestamp));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
new file mode 100644
index 0000000..09617fa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class UsePreviousTimeOnInvalidTimestampTest extends TimestampExtractorTest {
+
+ @Test
+ public void extractMetadataTimestamp() {
+ testExtractMetadataTimestamp(new UsePreviousTimeOnInvalidTimestamp());
+ }
+
+ @Test
+ public void usePreviousTimeOnInvalidTimestamp() {
+ final long previousTime = 42;
+
+ final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp();
+ final long timestamp = extractor.extract(
+ new ConsumerRecord<>("anyTopic", 0, 0, null, null),
+ previousTime
+ );
+
+ assertThat(timestamp, is(previousTime));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
new file mode 100644
index 0000000..b7b49bb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class WallclockTimestampExtractorTest {
+
+ @Test
+ public void extractSystemTimestamp() {
+ final TimestampExtractor extractor = new WallclockTimestampExtractor();
+
+ final long before = System.currentTimeMillis();
+ final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
+ final long after = System.currentTimeMillis();
+
+ assertThat(timestamp, is(new InBetween(before, after)));
+ }
+
+ private static class InBetween extends BaseMatcher<Long> {
+ private final long before;
+ private final long after;
+
+ public InBetween(long before, long after) {
+ this.before = before;
+ this.after = after;
+ }
+
+ @Override
+ public boolean matches(Object item) {
+ final long timestamp = (Long) item;
+ return before <= timestamp && timestamp <= after;
+ }
+
+ @Override
+ public void describeMismatch(Object item, Description mismatchDescription) {}
+
+ @Override
+ public void describeTo(Description description) {}
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a146316..d907506 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -17,10 +17,6 @@
package org.apache.kafka.streams.processor.internals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
@@ -49,6 +45,10 @@ import org.junit.Test;
import java.io.File;
import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
public class ProcessorTopologyTest {
private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
@@ -404,7 +404,7 @@ public class ProcessorTopologyTest {
public static class CustomTimestampExtractor implements TimestampExtractor {
@Override
- public long extract(ConsumerRecord<Object, Object> record) {
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return timestamp;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index f30e0e6..e0ee3ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -136,4 +138,24 @@ public class RecordQueueTest {
queue.addRawRecords(records, timestampExtractor);
}
+
+ @Test(expected = StreamsException.class)
+ public void shouldThrowOnNegativeTimestamp() {
+ final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+ final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+ new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+
+ queue.addRawRecords(records, new FailOnInvalidTimestamp());
+ }
+
+ @Test
+ public void shouldDropOnNegativeTimestamp() {
+ final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+ final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+ new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+
+ queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp());
+
+ assertEquals(0, queue.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
index 04e264c..0cab7f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
@@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor {
private final long base = SmokeTestUtil.START_TIME;
@Override
- public long extract(ConsumerRecord<Object, Object> record) {
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
switch (record.topic()) {
case "data":
return base + (Integer) record.value();
http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
index 274e7b5..2b24578 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class MockTimestampExtractor implements TimestampExtractor {
@Override
- public long extract(ConsumerRecord<Object, Object> record) {
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return record.offset();
}
}