You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 20:13:15 UTC

[beam] Diff for: [GitHub] asfgit merged pull request #7284: [BEAM-6234] Make failOnCheckpointingErrors setting available in FlinkPipelineOptions

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 2d0754631b36..461c168614b1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -231,6 +231,9 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
             .getCheckpointConfig()
             .setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);
       }
+
+      boolean failOnCheckpointingErrors = options.getFailOnCheckpointingErrors();
+      flinkStreamEnv.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
     } else {
       // https://issues.apache.org/jira/browse/FLINK-2491
       // Checkpointing is disabled, we can allow shutting down sources when they're done
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 4bb0c2a211e0..cc94793f3d0f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -108,6 +108,15 @@
 
   void setMinPauseBetweenCheckpoints(Long minPauseInterval);
 
+  @Description(
+      "Sets the expected behaviour for tasks in case that they encounter an error in their "
+          + "checkpointing procedure. If this is set to true, the task will fail on checkpointing error. "
+          + "If this is set to false, the task will only decline a the checkpoint and continue running. ")
+  @Default.Boolean(true)
+  Boolean getFailOnCheckpointingErrors();
+
+  void setFailOnCheckpointingErrors(Boolean failOnCheckpointingErrors);
+
   @Description(
       "Sets the number of times that failed tasks are re-executed. "
           + "A value of zero effectively disables fault tolerance. A value of -1 indicates "


With regards,
Apache Git Services