You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/07/13 14:11:18 UTC

[1/2] incubator-beam git commit: Allow for custom timestamp/watermark function in FlinkPipelineRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master ee1a3bcfb -> 1a7cd4112


Allow for custom timestamp/watermark function in FlinkPipelineRunner

Added new "of" signature and constructor for UnboundedFlinkSource to
allow event timestamping


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9000d95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9000d95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9000d95d

Branch: refs/heads/master
Commit: 9000d95d2b34ab45f799aedb140710986ff19452
Parents: ee1a3bc
Author: David Desberg <da...@uber.com>
Authored: Mon Jul 11 12:24:18 2016 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 13 14:15:54 2016 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 10 ++++++---
 .../streaming/io/UnboundedFlinkSource.java      | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 5d04068..fa6b387 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -70,7 +70,9 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -252,6 +254,8 @@ public class FlinkStreamingTransformTranslators {
       if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
         @SuppressWarnings("unchecked")
         UnboundedFlinkSource<T> flinkSourceFunction = (UnboundedFlinkSource<T>) transform.getSource();
+        final AssignerWithPeriodicWatermarks<T> flinkAssigner = flinkSourceFunction.getFlinkTimestampAssigner();
+
         DataStream<T> flinkSource = context.getExecutionEnvironment()
             .addSource(flinkSourceFunction.getFlinkSource());
 
@@ -260,17 +264,17 @@ public class FlinkStreamingTransformTranslators {
               context.getExecutionEnvironment().getConfig()));
 
         source = flinkSource
+            .assignTimestampsAndWatermarks(flinkAssigner)
             .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
               @Override
               public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
                 collector.collect(
                     WindowedValue.of(
                         s,
-                        Instant.now(),
+                        new Instant(flinkAssigner.extractTimestamp(s, -1)),
                         GlobalWindow.INSTANCE,
                         PaneInfo.NO_FIRING));
-              }
-            }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
+              }});
       } else {
         try {
           transform.getSource();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9000d95d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 94b73ce..716ca30 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import java.util.List;
@@ -40,14 +42,26 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
   /** Coder set during translation */
   private Coder<T> coder;
 
+  /** Timestamp / watermark assigner for source; defaults to ingestion time */
+  private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner = new IngestionTimeExtractor<T>();
+
   public UnboundedFlinkSource(SourceFunction<T> source) {
     flinkSource = checkNotNull(source);
   }
 
+  public UnboundedFlinkSource(SourceFunction<T> source, AssignerWithPeriodicWatermarks<T> timestampAssigner) {
+    flinkSource = checkNotNull(source);
+    flinkTimestampAssigner = checkNotNull(timestampAssigner);
+  }
+
   public SourceFunction<T> getFlinkSource() {
     return this.flinkSource;
   }
 
+  public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() {
+    return flinkTimestampAssigner;
+  }
+
   @Override
   public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
     throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner.");
@@ -79,6 +93,10 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
     this.coder = coder;
   }
 
+  public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
+    this.flinkTimestampAssigner = flinkTimestampAssigner;
+  }
+
   /**
    * Creates a new unbounded source from a Flink source.
    * @param flinkSource The Flink source function
@@ -88,4 +106,9 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
   public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
     return new UnboundedFlinkSource<>(flinkSource);
   }
+
+  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
+          SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
+    return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner);
+  }
 }


[2/2] incubator-beam git commit: This closes #630

Posted by al...@apache.org.
This closes #630


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1a7cd411
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1a7cd411
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1a7cd411

Branch: refs/heads/master
Commit: 1a7cd4112aab68d05dc59700883f63a082ea38e0
Parents: ee1a3bc 9000d95
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 13 14:16:02 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 13 14:16:02 2016 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 10 ++++++---
 .../streaming/io/UnboundedFlinkSource.java      | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------