You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/04 08:19:54 UTC

[2/3] beam git commit: Flink runner: specify CheckpointingMode through PipelineOptions.

Flink runner: specify CheckpointingMode through PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8035ae7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8035ae7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8035ae7

Branch: refs/heads/master
Commit: b8035ae7ad226bd0261a70fb8e0041e0f07e6dfe
Parents: 1866a01
Author: Pei He <pe...@apache.org>
Authored: Sat May 27 14:41:26 2017 +0800
Committer: Pei He <he...@alibaba-inc.com>
Committed: Sun Jun 4 16:18:36 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineExecutionEnvironment.java  | 2 +-
 .../org/apache/beam/runners/flink/FlinkPipelineOptions.java    | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7765a00..98f7c5a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -227,7 +227,7 @@ class FlinkPipelineExecutionEnvironment {
       if (checkpointInterval < 1) {
         throw new IllegalArgumentException("The checkpoint interval must be positive");
       }
-      flinkStreamEnv.enableCheckpointing(checkpointInterval);
+      flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode());
       boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
       boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
       if (externalizedCheckpoint) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 764fa5f..ee07abb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -70,6 +71,11 @@ public interface FlinkPipelineOptions
   Long getCheckpointingInterval();
   void setCheckpointingInterval(Long interval);
 
+  @Description("The checkpointing mode that defines consistency guarantee.")
+  @Default.Enum("AT_LEAST_ONCE")
+  CheckpointingMode getCheckpointingMode();
+  void setCheckpointingMode(CheckpointingMode mode);
+
   @Description("Sets the number of times that failed tasks are re-executed. "
       + "A value of zero effectively disables fault tolerance. A value of -1 indicates "
       + "that the system default value (as defined in the configuration) should be used.")