You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/18 00:53:25 UTC

kafka git commit: HOTFIX: corrupted commit 10cfc1628df024f7596d3af5c168fa90f59035ca

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 8deb05d56 -> f7795a525


HOTFIX: corrupted commit 10cfc1628df024f7596d3af5c168fa90f59035ca

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2147 from mjsax/hotfixBadCommit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7795a52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7795a52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7795a52

Branch: refs/heads/0.10.1
Commit: f7795a525179699ec6702c976e58c1d59ba8eb3c
Parents: 8deb05d
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Nov 17 16:53:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 17 16:53:21 2016 -0800

----------------------------------------------------------------------
 .../ConsumerRecordTimestampExtractor.java       | 52 +++++---------------
 1 file changed, 11 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7795a52/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 57a45f3..0d3424e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -18,53 +18,23 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.errors.StreamsException;
 
 /**
- * Retrieves embedded metadata timestamps from Kafka messages.
- * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
- * 0.10+ Kafka message format.
- * <p>
- * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
- * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
+ *
+ * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
  * via this timestamp extractor.
- * <p>
- * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
- * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
- * this extractor effectively provides <i>event-time</i> semantics.
- * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
- * using this extractor effectively provides <i>ingestion-time</i> semantics.
- * <p>
- * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
- * <p>
- * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
  *
- * @see RobustConsumerRecordTimestampExtractor
- * @see InferringConsumerRecordTimestampExtractor
- * @see WallclockTimestampExtractor
+ * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
+ * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
+ * this extractor effectively provides <i>ingestion-time</i> semantics.
+ *
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
  */
 public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
-
-    /**
-     * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
-     *
-     * @param record a data record
-     * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
-     * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
-     * @throws StreamsException if the embedded metadata timestamp is negative
-     */
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final long currentStreamsTime) {
-        final long timestamp = record.timestamp();
-
-        if (timestamp < 0) {
-            throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
-                    "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
-                    "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
-                    "Use a different TimestampExtractor to process this data.");
-        }
-
-        return timestamp;
+    public long extract(ConsumerRecord<Object, Object> record) {
+        return record.timestamp();
     }
-
 }