You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:37 UTC

[48/50] beam git commit: Shutdown Flink Streaming Pipeline when reaching +Inf watermark

Shutdown Flink Streaming Pipeline when reaching +Inf watermark


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c83ffe0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c83ffe0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c83ffe0

Branch: refs/heads/DSL_SQL
Commit: 9c83ffe0cdc6636d2187bf9439a73a3b45756d50
Parents: caecac3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 12:19:00 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 7 23:13:52 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/io/UnboundedSourceWrapper.java           | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c83ffe0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 6055a43..e75072a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -436,6 +437,10 @@ public class UnboundedSourceWrapper<
           }
         }
         context.emitWatermark(new Watermark(watermarkMillis));
+
+        if (watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+          this.isRunning = false;
+        }
       }
       setNextWatermarkTimer(this.runtimeContext);
     }