You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/04 08:19:54 UTC
[2/3] beam git commit: Flink runner: specify CheckpointingMode
through PipelineOptions.
Flink runner: specify CheckpointingMode through PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8035ae7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8035ae7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8035ae7
Branch: refs/heads/master
Commit: b8035ae7ad226bd0261a70fb8e0041e0f07e6dfe
Parents: 1866a01
Author: Pei He <pe...@apache.org>
Authored: Sat May 27 14:41:26 2017 +0800
Committer: Pei He <he...@alibaba-inc.com>
Committed: Sun Jun 4 16:18:36 2017 +0800
----------------------------------------------------------------------
.../beam/runners/flink/FlinkPipelineExecutionEnvironment.java | 2 +-
.../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 6 ++++++
2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7765a00..98f7c5a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -227,7 +227,7 @@ class FlinkPipelineExecutionEnvironment {
if (checkpointInterval < 1) {
throw new IllegalArgumentException("The checkpoint interval must be positive");
}
- flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode());
boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
if (externalizedCheckpoint) {
http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 764fa5f..ee07abb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
/**
* Options which can be used to configure a Flink PipelineRunner.
@@ -70,6 +71,11 @@ public interface FlinkPipelineOptions
Long getCheckpointingInterval();
void setCheckpointingInterval(Long interval);
+ @Description("The checkpointing mode that defines consistency guarantee.")
+ @Default.Enum("AT_LEAST_ONCE")
+ CheckpointingMode getCheckpointingMode();
+ void setCheckpointingMode(CheckpointingMode mode);
+
@Description("Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "
+ "that the system default value (as defined in the configuration) should be used.")