You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:37 UTC
[48/50] beam git commit: Shutdown Flink Streaming Pipeline when
reaching +Inf watermark
Shutdown Flink Streaming Pipeline when reaching +Inf watermark
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c83ffe0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c83ffe0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c83ffe0
Branch: refs/heads/DSL_SQL
Commit: 9c83ffe0cdc6636d2187bf9439a73a3b45756d50
Parents: caecac3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 12:19:00 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Wed Jun 7 23:13:52 2017 +0200
----------------------------------------------------------------------
.../wrappers/streaming/io/UnboundedSourceWrapper.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9c83ffe0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 6055a43..e75072a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -436,6 +437,10 @@ public class UnboundedSourceWrapper<
}
}
context.emitWatermark(new Watermark(watermarkMillis));
+
+ if (watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ this.isRunning = false;
+ }
}
setNextWatermarkTimer(this.runtimeContext);
}