You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 20:13:15 UTC
[beam] Diff for: [GitHub] asfgit merged pull request #7284: [BEAM-6234] Make
failOnCheckpointingErrors setting available in FlinkPipelineOptions
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 2d0754631b36..461c168614b1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -231,6 +231,9 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
.getCheckpointConfig()
.setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);
}
+
+ boolean failOnCheckpointingErrors = options.getFailOnCheckpointingErrors();
+ flinkStreamEnv.getCheckpointConfig().setFailOnCheckpointingErrors(failOnCheckpointingErrors);
} else {
// https://issues.apache.org/jira/browse/FLINK-2491
// Checkpointing is disabled, we can allow shutting down sources when they're done
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 4bb0c2a211e0..cc94793f3d0f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -108,6 +108,15 @@
void setMinPauseBetweenCheckpoints(Long minPauseInterval);
+ @Description(
+ "Sets the expected behaviour for tasks in case that they encounter an error in their "
+ + "checkpointing procedure. If this is set to true, the task will fail on checkpointing error. "
+ + "If this is set to false, the task will only decline a the checkpoint and continue running. ")
+ @Default.Boolean(true)
+ Boolean getFailOnCheckpointingErrors();
+
+ void setFailOnCheckpointingErrors(Boolean failOnCheckpointingErrors);
+
@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "
With regards,
Apache Git Services