You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/10 00:17:39 UTC

kafka git commit: KAFKA-4393: Improve invalid/negative TS handling

Repository: kafka
Updated Branches:
  refs/heads/trunk 7f8edbc8e -> 9bed8fbcf


KAFKA-4393: Improve invalid/negative TS handling

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang

Closes #2117 from mjsax/kafka-4393-improveInvalidTsHandling


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bed8fbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bed8fbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bed8fbc

Branch: refs/heads/trunk
Commit: 9bed8fbcfc52ced719f2dcafa3f30cbfd5e6bd57
Parents: 7f8edbc
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Dec 9 16:17:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 9 16:17:36 2016 -0800

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  2 +-
 docs/upgrade.html                               | 13 ++++
 .../pageview/JsonTimestampExtractor.java        |  2 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  4 +-
 .../ConsumerRecordTimestampExtractor.java       | 40 ----------
 .../ExtractRecordMetadataTimestamp.java         | 77 ++++++++++++++++++++
 .../processor/FailOnInvalidTimestamp.java       | 68 +++++++++++++++++
 .../processor/LogAndSkipOnInvalidTimestamp.java | 69 ++++++++++++++++++
 .../streams/processor/TimestampExtractor.java   | 12 +--
 .../UsePreviousTimeOnInvalidTimestamp.java      | 70 ++++++++++++++++++
 .../processor/WallclockTimestampExtractor.java  | 16 ++--
 .../processor/internals/RecordQueue.java        |  9 ++-
 .../streams/processor/internals/SinkNode.java   |  5 +-
 .../streams/processor/internals/StreamTask.java | 15 ++--
 .../processor/internals/StreamThread.java       |  8 +-
 .../processor/FailOnInvalidTimestampTest.java   | 36 +++++++++
 .../LogAndSkipOnInvalidTimestampTest.java       | 56 ++++++++++++++
 .../processor/TimestampExtractorTest.java       | 48 ++++++++++++
 .../UsePreviousTimeOnInvalidTimestampTest.java  | 45 ++++++++++++
 .../WallclockTimestampExtractorTest.java        | 62 ++++++++++++++++
 .../internals/ProcessorTopologyTest.java        | 10 +--
 .../processor/internals/RecordQueueTest.java    | 22 ++++++
 .../smoketest/TestTimestampExtractor.java       |  2 +-
 .../kafka/test/MockTimestampExtractor.java      |  2 +-
 24 files changed, 617 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 58525ad..8eebdb5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -27,6 +27,7 @@
   <allow pkg="javax.management" />
   <allow pkg="org.slf4j" />
   <allow pkg="org.junit" />
+  <allow pkg="org.hamcrest" />
   <allow pkg="org.easymock" />
   <allow pkg="org.powermock" />
   <allow pkg="java.security" />
@@ -151,7 +152,6 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
-      <allow pkg="org.hamcrest" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c63487d..06b53da 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -26,6 +26,19 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
         Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
 </ul>
 
+<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+  <li>Upgrading a Kafka Streams Applications:
+    <ul>
+      <li>You need to recompile your code. Just swapping the jar file will not work and will break your appliation.</li>
+      <li>If you use a custom timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface got changed.</li>
+    </ul>
+  </li>
+</ol>
+
 <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
 However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 63e8377..918cd65 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class JsonTimestampExtractor implements TimestampExtractor {
 
     @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
+    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
         if (record.value() instanceof PageViewTypedDemo.PageView) {
             return ((PageViewTypedDemo.PageView) record.value()).timestamp;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5ba4383..53f49ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
@@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig {
                                         REPLICATION_FACTOR_DOC)
                                 .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                                         Type.CLASS,
-                                        ConsumerRecordTimestampExtractor.class.getName(),
+                                        FailOnInvalidTimestamp.class.getName(),
                                         Importance.MEDIUM,
                                         TIMESTAMP_EXTRACTOR_CLASS_DOC)
                                 .define(PARTITION_GROUPER_CLASS_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
deleted file mode 100644
index 0d3424e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-/**
- * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
- *
- * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
- * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
- * via this timestamp extractor.
- *
- * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
- * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
- * this extractor effectively provides <i>ingestion-time</i> semantics.
- *
- * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
- */
-public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
-    @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
-        return record.timestamp();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
new file mode 100644
index 0000000..cbe024e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value, an error handler method is called.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
+
+    /**
+     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
+     *
+     * @param record a data record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
+     */
+    @Override
+    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
+        final long timestamp = record.timestamp();
+
+        if (timestamp < 0) {
+            return onInvalidTimestamp(record, timestamp, previousTimestamp);
+        }
+
+        return timestamp;
+    }
+
+    /**
+     * Called if no valid timestamp is embedded in the record meta data.
+     *
+     * @param record a data record
+     * @param recordTimestamp the timestamp extractor from the record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
+     */
+    public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+                                            final long recordTimestamp,
+                                            final long previousTimestamp);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
new file mode 100644
index 0000000..d7f64a2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+
+    /**
+     * Raises an exception on every call.
+     *
+     * @param record a data record
+     * @param recordTimestamp the timestamp extractor from the record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return nothing; always raises an exception
+     * @throws StreamsException on every invocation
+     */
+    @Override
+    public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+                                   final long recordTimestamp,
+                                   final long previousTimestamp)
+            throws StreamsException {
+        throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
+            "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
+            "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
+            "Use a different TimestampExtractor to process this data.");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
new file mode 100644
index 0000000..f24fd15
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp value the timestamp is returned as-is;
+ * in addition, a WARN message is logged in your application.
+ * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed.
+ * <p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see UsePreviousTimeOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+    private static final Logger log = LoggerFactory.getLogger(LogAndSkipOnInvalidTimestamp.class);
+
+    /**
+     * Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the invalid timestamp as-is,
+     * which ultimately causes the record to be skipped and not to be processed.
+     *
+     * @param record a data record
+     * @param recordTimestamp the timestamp extractor from the record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return the originally extracted timestamp of the record
+     */
+    @Override
+    public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+                                   final long recordTimestamp,
+                                   final long previousTimestamp) {
+        log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record);
+        return recordTimestamp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index c55518b..0de96ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -27,17 +27,19 @@ import org.apache.kafka.streams.kstream.KTable;
 public interface TimestampExtractor {
 
     /**
-     * Extracts a timestamp from a record.
+     * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp.
+     * Returning a negative timestamp will cause the record not to be processed but rather silently skipped.
      * <p>
      * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC.
-     *
+     * <p>
      * It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s
      * and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in
      * excessive log rolling and therefore broker performance degradation.
      *
      *
-     * @param record  a data record
-     * @return        the timestamp of the record
+     * @param record a data record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return the timestamp of the record
      */
-    long extract(ConsumerRecord<Object, Object> record);
+    long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
new file mode 100644
index 0000000..7718b5c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * Retrieves embedded metadata timestamps from Kafka messages.
+ * If a record has a negative (invalid) timestamp, a new timestamp will be inferred from the current stream-time.
+ * <p></p>
+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
+ * 0.10+ Kafka message format.
+ * <p>
+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * via this timestamp extractor.
+ * <p>
+ * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
+ * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
+ * this extractor effectively provides <i>event-time</i> semantics.
+ * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
+ * using this extractor effectively provides <i>ingestion-time</i> semantics.
+ * <p>
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
+ * @see WallclockTimestampExtractor
+ */
+public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
+
+    /**
+     * Returns the current stream-time as new timestamp for the record.
+     *
+     * @param record a data record
+     * @param recordTimestamp the timestamp extractor from the record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return the provided latest extracted valid timestamp as new timestamp for the record
+     * @throws StreamsException if latest extracted valid timestamp is unknown
+     */
+    @Override
+    public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
+                                   final long recordTimestamp,
+                                   final long previousTimestamp)
+            throws StreamsException {
+        if (previousTimestamp < 0) {
+            throw new StreamsException("Could not infer new timestamp for input record " + record
+                    + " because latest extracted valid timestamp is unknown.");
+        }
+        return previousTimestamp;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index 305573b..6df9481 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -21,22 +21,26 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}.
- *
+ * <p>
  * Using this extractor effectively provides <i>processing-time</i> semantics.
- *
- * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with
+ * <p>
+ * If you need <i>event-time</i> semantics, use {@link FailOnInvalidTimestamp} with
  * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details).
+ *
+ * @see FailOnInvalidTimestamp
+ * @see LogAndSkipOnInvalidTimestamp
  */
 public class WallclockTimestampExtractor implements TimestampExtractor {
 
     /**
      * Return the current wall clock time as timestamp.
      *
-     * @param record  a data record
-     * @return        the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
+     * @param record a data record
+     * @param previousTimestamp the latest extracted valid timestamp of the current record's partition\u02d9 (could be -1 if unknown)
+     * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
      */
     @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
+    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
         return System.currentTimeMillis();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 44ef146..a40b9ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -102,13 +102,14 @@ public class RecordQueue {
                                                                          rawRecord.checksum(),
                                                                          rawRecord.serializedKeySize(),
                                                                          rawRecord.serializedValueSize(), key, value);
-            long timestamp = timestampExtractor.extract(record);
+            long timestamp = timestampExtractor.extract(record, timeTracker.get());
 
             log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);
 
-            // validate that timestamp must be non-negative
-            if (timestamp < 0)
-                throw new StreamsException("Extracted timestamp value is negative, which is not allowed.");
+            // drop message if TS is invalid, i.e., negative
+            if (timestamp < 0) {
+                continue;
+            }
 
             StampedRecord stampedRecord = new StampedRecord(record, timestamp);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 2f20cdb..e7f32b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -73,10 +73,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
         final long timestamp = context.timestamp();
         if (timestamp < 0) {
-            throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " +
-                "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
-                "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
-                "Use a different TimestampExtractor to process this data.");
+            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">.");
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a40e1be..ae374ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -123,22 +123,27 @@ public class StreamTask extends AbstractTask implements Punctuator {
     }
 
     /**
-     * Adds records to queues
+     * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
+     * and not added to the queue for processing
      *
      * @param partition the partition
      * @param records  the records
+     * @returns the number of added records
      */
     @SuppressWarnings("unchecked")
-    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        int queueSize = partitionGroup.addRawRecords(partition, records);
+    public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
+        final int oldQueueSize = partitionGroup.numBuffered();
+        final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
-        log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, queueSize);
+        log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);
 
         // if after adding these records, its partition queue's buffered size has been
         // increased beyond the threshold, we can then pause the consumption for this partition
-        if (queueSize > this.maxBufferedSize) {
+        if (newQueueSize > this.maxBufferedSize) {
             consumer.pause(singleton(partition));
         }
+
+        return newQueueSize - oldQueueSize;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5e82829..96e9963 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -549,10 +549,12 @@ public class StreamThread extends Thread {
                     throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
 
                 if (!records.isEmpty()) {
+                    int numAddedRecords = 0;
                     for (TopicPartition partition : records.partitions()) {
                         StreamTask task = activeTasksByPartition.get(partition);
-                        task.addRecords(partition, records.records(partition));
+                        numAddedRecords += task.addRecords(partition, records.records(partition));
                     }
+                    sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
                     polledRecords = true;
                 } else {
                     polledRecords = false;
@@ -1020,6 +1022,7 @@ public class StreamThread extends Thread {
         final Sensor punctuateTimeSensor;
         final Sensor taskCreationSensor;
         final Sensor taskDestructionSensor;
+        final Sensor skippedRecordsSensor;
 
         public StreamsMetricsImpl(Metrics metrics) {
             this.metrics = metrics;
@@ -1052,6 +1055,9 @@ public class StreamThread extends Thread {
 
             this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
             this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
+
+            this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records");
+            this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count()));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
new file mode 100644
index 0000000..738e956
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.junit.Test;
+
+public class FailOnInvalidTimestampTest extends TimestampExtractorTest {
+
+    @Test
+    public void extractMetadataTimestamp() {
+        testExtractMetadataTimestamp(new FailOnInvalidTimestamp());
+    }
+
+    @Test(expected = StreamsException.class)
+    public void failOnInvalidTimestamp() {
+        final TimestampExtractor extractor = new FailOnInvalidTimestamp();
+        extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
new file mode 100644
index 0000000..92d8709
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest {
+
+    @Test
+    public void extractMetadataTimestamp() {
+        testExtractMetadataTimestamp(new LogAndSkipOnInvalidTimestamp());
+    }
+
+    @Test
+    public void logAndSkipOnInvalidTimestamp() {
+        final long invalidMetadataTimestamp = -42;
+
+        final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp();
+        final long timestamp = extractor.extract(
+            new ConsumerRecord<>(
+                "anyTopic",
+                0,
+                0,
+                invalidMetadataTimestamp,
+                TimestampType.NO_TIMESTAMP_TYPE,
+                0,
+                0,
+                0,
+                null,
+                null),
+            0
+        );
+
+        assertThat(timestamp, is(invalidMetadataTimestamp));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
new file mode 100644
index 0000000..93e0b5b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class TimestampExtractorTest {
+
+    void testExtractMetadataTimestamp(TimestampExtractor extractor) {
+        final long metadataTimestamp = 42;
+
+        final long timestamp = extractor.extract(
+            new ConsumerRecord<>(
+                "anyTopic",
+                0,
+                0,
+                metadataTimestamp,
+                TimestampType.NO_TIMESTAMP_TYPE,
+                0,
+                0,
+                0,
+                null,
+                null),
+            0
+        );
+
+        assertThat(timestamp, is(metadataTimestamp));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
new file mode 100644
index 0000000..09617fa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class UsePreviousTimeOnInvalidTimestampTest extends TimestampExtractorTest {
+
+    @Test
+    public void extractMetadataTimestamp() {
+        testExtractMetadataTimestamp(new UsePreviousTimeOnInvalidTimestamp());
+    }
+
+    @Test
+    public void usePreviousTimeOnInvalidTimestamp() {
+        final long previousTime = 42;
+
+        final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp();
+        final long timestamp = extractor.extract(
+            new ConsumerRecord<>("anyTopic", 0, 0, null, null),
+            previousTime
+        );
+
+        assertThat(timestamp, is(previousTime));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
new file mode 100644
index 0000000..b7b49bb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class WallclockTimestampExtractorTest {
+
+    @Test
+    public void extractSystemTimestamp() {
+        final TimestampExtractor extractor = new WallclockTimestampExtractor();
+
+        final long before = System.currentTimeMillis();
+        final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42);
+        final long after = System.currentTimeMillis();
+
+        assertThat(timestamp, is(new InBetween(before, after)));
+    }
+
+    private static class InBetween extends BaseMatcher<Long> {
+        private final long before;
+        private final long after;
+
+        public InBetween(long before, long after) {
+            this.before = before;
+            this.after = after;
+        }
+
+        @Override
+        public boolean matches(Object item) {
+            final long timestamp = (Long) item;
+            return before <= timestamp && timestamp <= after;
+        }
+
+        @Override
+        public void describeMismatch(Object item, Description mismatchDescription) {}
+
+        @Override
+        public void describeTo(Description description) {}
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index a146316..d907506 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -49,6 +45,10 @@ import org.junit.Test;
 import java.io.File;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 public class ProcessorTopologyTest {
 
     private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
@@ -404,7 +404,7 @@ public class ProcessorTopologyTest {
 
     public static class CustomTimestampExtractor implements TimestampExtractor {
         @Override
-        public long extract(ConsumerRecord<Object, Object> record) {
+        public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
             return timestamp;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index f30e0e6..e0ee3ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -136,4 +138,24 @@ public class RecordQueueTest {
 
         queue.addRawRecords(records, timestampExtractor);
     }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnNegativeTimestamp() {
+        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+                new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+
+        queue.addRawRecords(records, new FailOnInvalidTimestamp());
+    }
+
+    @Test
+    public void shouldDropOnNegativeTimestamp() {
+        final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
+        final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
+                new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+
+        queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp());
+
+        assertEquals(0, queue.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
index 04e264c..0cab7f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java
@@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor {
     private final long base = SmokeTestUtil.START_TIME;
 
     @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
+    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
         switch (record.topic()) {
             case "data":
                 return base + (Integer) record.value();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
index 274e7b5..2b24578 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
@@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class MockTimestampExtractor implements TimestampExtractor {
 
     @Override
-    public long extract(ConsumerRecord<Object, Object> record) {
+    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
         return record.offset();
     }
 }