You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/07/13 14:11:18 UTC
[1/2] incubator-beam git commit: Allow for custom timestamp/watermark
function in FlinkPipelineRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master ee1a3bcfb -> 1a7cd4112
Allow for custom timestamp/watermark function in FlinkPipelineRunner
Added new "of" signature and constructor for UnboundedFlinkSource to
allow event timestamping
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9000d95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9000d95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9000d95d
Branch: refs/heads/master
Commit: 9000d95d2b34ab45f799aedb140710986ff19452
Parents: ee1a3bc
Author: David Desberg <da...@uber.com>
Authored: Mon Jul 11 12:24:18 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 13 14:15:54 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 10 ++++++---
.../streaming/io/UnboundedFlinkSource.java | 23 ++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 5d04068..fa6b387 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -70,7 +70,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -252,6 +254,8 @@ public class FlinkStreamingTransformTranslators {
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
@SuppressWarnings("unchecked")
UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource();
+ final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
+
DataStream<T> flinkSource = context.getExecutionEnvironment()
.addSource(flinkSourceFunction.getFlinkSource());
@@ -260,17 +264,17 @@ public class FlinkStreamingTransformTranslators {
context.getExecutionEnvironment().getConfig()));
source = flinkSource
+ .assignTimestampsAndWatermarks(flinkAssigner)
.flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
@Override
public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
collector.collect(
WindowedValue.of(
s,
- Instant.now(),
+ new Instant(flinkAssigner.extractTimestamp(s, -1)),
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING));
- }
- }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
+ }});
} else {
try {
transform.getSource();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 94b73ce..716ca30 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.List;
@@ -40,14 +42,26 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
/** Coder set during translation */
private Coder<T> coder;
+ /** Timestamp / watermark assigner for source; defaults to ingestion time */
+ private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new IngestionTimeExtractor<T>();
+
public UnboundedFlinkSource(SourceFunction<T> source) {
flinkSource = checkNotNull(source);
}
+ public UnboundedFlinkSource(SourceFunction<T> source, AssignerWithPeriodicWatermarks<T> timestampAssigner) {
+ flinkSource = checkNotNull(source);
+ flinkTimestampAssigner = checkNotNull(timestampAssigner);
+ }
+
public SourceFunction<T> getFlinkSource() {
return this.flinkSource;
}
+ public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() {
+ return flinkTimestampAssigner;
+ }
+
@Override
public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
@@ -79,6 +93,10 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
this.coder = coder;
}
+ public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
+ this.flinkTimestampAssigner = flinkTimestampAssigner;
+ }
+
/**
* Creates a new unbounded source from a Flink source.
* @param flinkSource The Flink source function
@@ -88,4 +106,9 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
return new UnboundedFlinkSource<>(flinkSource);
}
+
+ public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
+ SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
+ return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner);
+ }
}
[2/2] incubator-beam git commit: This closes #630
Posted by al...@apache.org.
This closes #630
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1a7cd411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1a7cd411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1a7cd411
Branch: refs/heads/master
Commit: 1a7cd4112aab68d05dc59700883f63a082ea38e0
Parents: ee1a3bc 9000d95
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 13 14:16:02 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 13 14:16:02 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 10 ++++++---
.../streaming/io/UnboundedFlinkSource.java | 23 ++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------