You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/18 00:53:25 UTC
kafka git commit: HOTFIX: corrupted commit
10cfc1628df024f7596d3af5c168fa90f59035ca
Repository: kafka
Updated Branches:
refs/heads/0.10.1 8deb05d56 -> f7795a525
HOTFIX: corrupted commit 10cfc1628df024f7596d3af5c168fa90f59035ca
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2147 from mjsax/hotfixBadCommit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7795a52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7795a52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7795a52
Branch: refs/heads/0.10.1
Commit: f7795a525179699ec6702c976e58c1d59ba8eb3c
Parents: 8deb05d
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Nov 17 16:53:21 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 17 16:53:21 2016 -0800
----------------------------------------------------------------------
.../ConsumerRecordTimestampExtractor.java | 52 +++++---------------
1 file changed, 11 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7795a52/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
index 57a45f3..0d3424e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java
@@ -18,53 +18,23 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.errors.StreamsException;
/**
- * Retrieves embedded metadata timestamps from Kafka messages.
- * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new
- * 0.10+ Kafka message format.
- * <p>
- * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and
- * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved
+ * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
+ *
+ * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
+ * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
* via this timestamp extractor.
- * <p>
- * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting
- * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}),
- * this extractor effectively provides <i>event-time</i> semantics.
- * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps,
- * using this extractor effectively provides <i>ingestion-time</i> semantics.
- * <p>
- * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
- * <p>
- * If a record has a negative (invalid) timestamp value, this extractor raises an exception.
*
- * @see RobustConsumerRecordTimestampExtractor
- * @see InferringConsumerRecordTimestampExtractor
- * @see WallclockTimestampExtractor
+ * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
+ * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using
+ * this extractor effectively provides <i>ingestion-time</i> semantics.
+ *
+ * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
*/
public class ConsumerRecordTimestampExtractor implements TimestampExtractor {
-
- /**
- * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
- *
- * @param record a data record
- * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
- * @return the embedded metadata timestamp of the given {@link ConsumerRecord}
- * @throws StreamsException if the embedded metadata timestamp is negative
- */
@Override
- public long extract(final ConsumerRecord<Object, Object> record, final long currentStreamsTime) {
- final long timestamp = record.timestamp();
-
- if (timestamp < 0) {
- throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " +
- "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " +
- "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " +
- "Use a different TimestampExtractor to process this data.");
- }
-
- return timestamp;
+ public long extract(ConsumerRecord<Object, Object> record) {
+ return record.timestamp();
}
-
}