You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/03/24 17:15:18 UTC
[1/2] beam git commit: append to #2135, add 1). fix issue of
NO_TIMESTAMP type in 10; 2). rename field to 'timestamp';
Repository: beam
Updated Branches:
refs/heads/master 741242732 -> 5c2da7dc2
append to #2135, add
1). fix issue of NO_TIMESTAMP type in 10;
2). rename field to 'timestamp';
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f10509e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f10509e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f10509e7
Branch: refs/heads/master
Commit: f10509e745ff234110bc50d16aba1cb6813036b6
Parents: 7412427
Author: mingmxu <mi...@ebay.com>
Authored: Fri Mar 17 13:18:17 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Mar 24 10:14:33 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 43 +++++++++++++++++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 21 +++++++++-
.../apache/beam/sdk/io/kafka/KafkaRecord.java | 15 +++++--
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 5 +++
4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index b92b6fa..8fe17c1 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -17,10 +17,14 @@
*/
package org.apache.beam.sdk.io.kafka;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Collection;
-
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.SpelParserConfiguration;
@@ -33,16 +37,28 @@ import org.springframework.expression.spel.support.StandardEvaluationContext;
* to eliminate the method definition differences.
*/
class ConsumerSpEL {
- SpelParserConfiguration config = new SpelParserConfiguration(true, true);
- ExpressionParser parser = new SpelExpressionParser(config);
+ private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class);
+
+ private SpelParserConfiguration config = new SpelParserConfiguration(true, true);
+ private ExpressionParser parser = new SpelExpressionParser(config);
- Expression seek2endExpression =
+ private Expression seek2endExpression =
parser.parseExpression("#consumer.seekToEnd(#tp)");
- Expression assignExpression =
+ private Expression assignExpression =
parser.parseExpression("#consumer.assign(#tp)");
- public ConsumerSpEL() {}
+ private Method timestampMethod;
+ private boolean hasRecordTimestamp = false;
+
+ public ConsumerSpEL() {
+ try {
+ timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null);
+ hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE);
+ } catch (NoSuchMethodException | SecurityException e) {
+ LOG.debug("Timestamp for Kafka message is not available.");
+ }
+ }
public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) {
StandardEvaluationContext mapContext = new StandardEvaluationContext();
@@ -57,4 +73,19 @@ class ConsumerSpEL {
mapContext.setVariable("tp", topicPartitions);
assignExpression.getValue(mapContext);
}
+
+ public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) {
+ long timestamp;
+ try {
+ //for Kafka 0.9, set to System.currentTimeMillis();
+ //for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis();
+ if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) <= 0L) {
+ timestamp = System.currentTimeMillis();
+ }
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ // Not expected. Method timestamp() is already checked.
+ throw new RuntimeException(e);
+ }
+ return timestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 890fb2b..310392c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -203,6 +203,14 @@ import org.slf4j.LoggerFactory;
* {@link ProducerConfig} for sink. E.g. if you would like to enable offset
* <em>auto commit</em> (for external monitoring or other purposes), you can set
* <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ *
+ * <h3>Event Timestamp and Watermark</h3>
+ * By default record timestamp and watermark are based on processing time in KafkaIO reader.
+ * This can be overridden by providing {@code WatermarkFn} with
+ * {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with
+ * {@link Read#withTimestampFn(SerializableFunction)}.<br>
+ * Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any,
+ * otherwise it is set to processing time.
*/
public class KafkaIO {
/**
@@ -428,6 +436,7 @@ public class KafkaIO {
checkNotNull(getValueCoder(), "Value coder must be set");
}
+ @Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
@@ -458,6 +467,7 @@ public class KafkaIO {
private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+ @Override
public OutT apply(KafkaRecord<KeyT, ValueT> record) {
return fn.apply(record.getKV());
}
@@ -499,6 +509,7 @@ public class KafkaIO {
private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
KAFKA_CONSUMER_FACTORY_FN =
new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
+ @Override
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
return new KafkaConsumer<>(config);
}
@@ -627,6 +638,7 @@ public class KafkaIO {
}
Collections.sort(partitions, new Comparator<TopicPartition>() {
+ @Override
public int compare(TopicPartition tp1, TopicPartition tp2) {
return ComparisonChain
.start()
@@ -750,6 +762,7 @@ public class KafkaIO {
/** watermark before any records have been read. */
private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
+ @Override
public String toString() {
return name;
}
@@ -800,13 +813,14 @@ public class KafkaIO {
public UnboundedKafkaReader(
UnboundedKafkaSource<K, V> source,
@Nullable KafkaCheckpointMark checkpointMark) {
-
+ this.consumerSpEL = new ConsumerSpEL();
this.source = source;
this.name = "Reader-" + source.id;
List<TopicPartition> partitions = source.spec.getTopicPartitions();
partitionStates = ImmutableList.copyOf(Lists.transform(partitions,
new Function<TopicPartition, PartitionState>() {
+ @Override
public PartitionState apply(TopicPartition tp) {
return new PartitionState(tp, UNINITIALIZED_OFFSET);
}
@@ -886,7 +900,6 @@ public class KafkaIO {
@Override
public boolean start() throws IOException {
- this.consumerSpEL = new ConsumerSpEL();
Read<K, V> spec = source.spec;
consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -909,6 +922,7 @@ public class KafkaIO {
// Note that consumer is not thread safe, should not be accessed out side consumerPollLoop().
consumerPollThread.submit(
new Runnable() {
+ @Override
public void run() {
consumerPollLoop();
}
@@ -929,6 +943,7 @@ public class KafkaIO {
offsetFetcherThread.scheduleAtFixedRate(
new Runnable() {
+ @Override
public void run() {
updateLatestOffsets();
}
@@ -986,6 +1001,7 @@ public class KafkaIO {
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
+ consumerSpEL.getRecordTimestamp(rawRecord),
decode(rawRecord.key(), source.spec.getKeyCoder()),
decode(rawRecord.value(), source.spec.getValueCoder()));
@@ -1059,6 +1075,7 @@ public class KafkaIO {
return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
Lists.transform(partitionStates,
new Function<PartitionState, PartitionMark>() {
+ @Override
public PartitionMark apply(PartitionState p) {
return new PartitionMark(p.topicPartition.topic(),
p.topicPartition.partition(),
http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
index fa202e1..e0e400e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import java.io.Serializable;
import java.util.Arrays;
+
import org.apache.beam.sdk.values.KV;
/**
@@ -31,25 +32,28 @@ public class KafkaRecord<K, V> implements Serializable {
private final int partition;
private final long offset;
private final KV<K, V> kv;
+ private final long timestamp;
public KafkaRecord(
String topic,
int partition,
long offset,
+ long timestamp,
K key,
V value) {
- this(topic, partition, offset, KV.of(key, value));
+ this(topic, partition, offset, timestamp, KV.of(key, value));
}
public KafkaRecord(
String topic,
int partition,
long offset,
+ long timestamp,
KV<K, V> kv) {
-
this.topic = topic;
this.partition = partition;
this.offset = offset;
+ this.timestamp = timestamp;
this.kv = kv;
}
@@ -69,9 +73,13 @@ public class KafkaRecord<K, V> implements Serializable {
return kv;
}
+ public long getTimestamp() {
+ return timestamp;
+ }
+
@Override
public int hashCode() {
- return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
+ return Arrays.deepHashCode(new Object[]{topic, partition, offset, timestamp, kv});
}
@Override
@@ -82,6 +90,7 @@ public class KafkaRecord<K, V> implements Serializable {
return topic.equals(other.topic)
&& partition == other.partition
&& offset == other.offset
+ && timestamp == other.timestamp
&& kv.equals(other.kv);
} else {
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index ea78f09..2043a4c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -19,10 +19,12 @@ package org.apache.beam.sdk.io.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
@@ -66,6 +68,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
stringCoder.encode(value.getTopic(), outStream, nested);
intCoder.encode(value.getPartition(), outStream, nested);
longCoder.encode(value.getOffset(), outStream, nested);
+ longCoder.encode(value.getTimestamp(), outStream, nested);
kvCoder.encode(value.getKV(), outStream, context);
}
@@ -77,6 +80,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
stringCoder.decode(inStream, nested),
intCoder.decode(inStream, nested),
longCoder.decode(inStream, nested),
+ longCoder.decode(inStream, nested),
kvCoder.decode(inStream, context));
}
@@ -106,6 +110,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
value.getTopic(),
value.getPartition(),
value.getOffset(),
+ value.getTimestamp(),
(KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
}
}
[2/2] beam git commit: This closes #2267
Posted by da...@apache.org.
This closes #2267
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c2da7dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c2da7dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c2da7dc
Branch: refs/heads/master
Commit: 5c2da7dc2eb2a6a6ab7138ea4b37884d9327de7e
Parents: 7412427 f10509e
Author: Davor Bonaci <da...@google.com>
Authored: Fri Mar 24 10:15:07 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Mar 24 10:15:07 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 43 +++++++++++++++++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 21 +++++++++-
.../apache/beam/sdk/io/kafka/KafkaRecord.java | 15 +++++--
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 5 +++
4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------