You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/25 15:43:08 UTC
[1/2] beam git commit: [BEAM-1812] Add externalized checkpoint
configuration to FlinkPipelineOptions
Repository: beam
Updated Branches:
refs/heads/master 3178f07b9 -> 6aed130cc
[BEAM-1812] Add externalized checkpoint configuration to FlinkPipelineOptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63327dd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63327dd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63327dd3
Branch: refs/heads/master
Commit: 63327dd3878c7b7a1891d53b64d999f40565948d
Parents: 3178f07
Author: Jins George <ji...@aeris.net>
Authored: Tue Apr 4 16:05:57 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 25 17:33:00 2017 +0200
----------------------------------------------------------------------
.../flink/FlinkPipelineExecutionEnvironment.java | 8 ++++++++
.../beam/runners/flink/FlinkPipelineOptions.java | 13 +++++++++++++
.../apache/beam/runners/flink/PipelineOptionsTest.java | 9 +++++++++
3 files changed, 30 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index ba00036..7765a00 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -227,6 +228,13 @@ class FlinkPipelineExecutionEnvironment {
throw new IllegalArgumentException("The checkpoint interval must be positive");
}
flinkStreamEnv.enableCheckpointing(checkpointInterval);
+ boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
+ boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
+ if (externalizedCheckpoint) {
+ flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+ retainOnCancellation ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
+ : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ }
}
// State backend
http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b769a6f..764fa5f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -103,4 +103,17 @@ public interface FlinkPipelineOptions
Boolean getEnableMetrics();
void setEnableMetrics(Boolean enableMetrics);
+ /**
+ * Enables or disables externalized checkpoints.
+ */
+ @Description("Enables or disables externalized checkpoints. "
+ + "Works in conjunction with CheckpointingInterval")
+ @Default.Boolean(false)
+ Boolean isExternalizedCheckpointsEnabled();
+ void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints);
+
+ @Description("Sets the behavior of externalized checkpoints on cancellation.")
+ @Default.Boolean(false)
+ Boolean getRetainExternalizedCheckpointsOnCancellation();
+ void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 9bc2c3d..23740a1 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -180,6 +180,15 @@ public class PipelineOptionsTest {
}
+ @Test
+ public void testExternalizedCheckpointsConfigs() {
+ String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
+ "--retainExternalizedCheckpointsOnCancellation=false" };
+ final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
+ .as(FlinkPipelineOptions.class);
+ assertEquals(options.isExternalizedCheckpointsEnabled(), true);
+ assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false);
+ }
private static class TestDoFn extends DoFn<String, String> {
[2/2] beam git commit: This closes #2442
Posted by al...@apache.org.
This closes #2442
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6aed130c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6aed130c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6aed130c
Branch: refs/heads/master
Commit: 6aed130cc293cfa30e5ec3eca09d1145eea908a2
Parents: 3178f07 63327dd
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 25 17:42:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 25 17:42:26 2017 +0200
----------------------------------------------------------------------
.../flink/FlinkPipelineExecutionEnvironment.java | 8 ++++++++
.../beam/runners/flink/FlinkPipelineOptions.java | 13 +++++++++++++
.../apache/beam/runners/flink/PipelineOptionsTest.java | 9 +++++++++
3 files changed, 30 insertions(+)
----------------------------------------------------------------------