You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/25 15:43:08 UTC

[1/2] beam git commit: [BEAM-1812] Add externalized checkpoint configuration to FlinkPipelineOptions

Repository: beam
Updated Branches:
  refs/heads/master 3178f07b9 -> 6aed130cc


[BEAM-1812] Add externalized checkpoint configuration to FlinkPipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63327dd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63327dd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63327dd3

Branch: refs/heads/master
Commit: 63327dd3878c7b7a1891d53b64d999f40565948d
Parents: 3178f07
Author: Jins George <ji...@aeris.net>
Authored: Tue Apr 4 16:05:57 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 25 17:33:00 2017 +0200

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java       |  8 ++++++++
 .../beam/runners/flink/FlinkPipelineOptions.java       | 13 +++++++++++++
 .../apache/beam/runners/flink/PipelineOptionsTest.java |  9 +++++++++
 3 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index ba00036..7765a00 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -227,6 +228,13 @@ class FlinkPipelineExecutionEnvironment {
         throw new IllegalArgumentException("The checkpoint interval must be positive");
       }
       flinkStreamEnv.enableCheckpointing(checkpointInterval);
+      boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
+      boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
+      if (externalizedCheckpoint) {
+        flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+            retainOnCancellation ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
+                : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+      }
     }
 
     // State backend

http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b769a6f..764fa5f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -103,4 +103,17 @@ public interface FlinkPipelineOptions
   Boolean getEnableMetrics();
   void setEnableMetrics(Boolean enableMetrics);
 
+  /**
+   * Enables or disables externalized checkpoints.
+   */
+  @Description("Enables or disables externalized checkpoints. "
+      + "Works in conjunction with CheckpointingInterval")
+  @Default.Boolean(false)
+  Boolean isExternalizedCheckpointsEnabled();
+  void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints);
+
+  @Description("Sets the behavior of externalized checkpoints on cancellation.")
+  @Default.Boolean(false)
+  Boolean getRetainExternalizedCheckpointsOnCancellation();
+  void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 9bc2c3d..23740a1 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -180,6 +180,15 @@ public class PipelineOptionsTest {
 
   }
 
+  @Test
+  public void testExternalizedCheckpointsConfigs() {
+    String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
+        "--retainExternalizedCheckpointsOnCancellation=false" };
+    final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
+        .as(FlinkPipelineOptions.class);
+    assertEquals(options.isExternalizedCheckpointsEnabled(), true);
+    assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false);
+  }
 
   private static class TestDoFn extends DoFn<String, String> {
 


[2/2] beam git commit: This closes #2442

Posted by al...@apache.org.
This closes #2442


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6aed130c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6aed130c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6aed130c

Branch: refs/heads/master
Commit: 6aed130cc293cfa30e5ec3eca09d1145eea908a2
Parents: 3178f07 63327dd
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 25 17:42:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Apr 25 17:42:26 2017 +0200

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java       |  8 ++++++++
 .../beam/runners/flink/FlinkPipelineOptions.java       | 13 +++++++++++++
 .../apache/beam/runners/flink/PipelineOptionsTest.java |  9 +++++++++
 3 files changed, 30 insertions(+)
----------------------------------------------------------------------