You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:03:53 UTC

[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option to set state backend

[BEAM-617][flink] introduce option to set state backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912

Branch: refs/heads/master
Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:25:32 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 15:59:48 2016 +0200

----------------------------------------------------------------------
 .../runners/flink/FlinkPipelineExecutionEnvironment.java |  7 +++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java  | 11 +++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index a5d33b4..391c3f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
@@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment {
       flinkStreamEnv.enableCheckpointing(checkpointInterval);
     }
 
+    // State backend
+    final AbstractStateBackend stateBackend = options.getStateBackend();
+    if (stateBackend != null) {
+      flinkStreamEnv.setStateBackend(stateBackend);
+    }
+
     return flinkStreamEnv;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 1fb23ec..a067e76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -82,4 +83,14 @@ public interface FlinkPipelineOptions
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
 
+  /**
+   * Sets a state backend to store Beam's state during computation.
+   * Note: Only applicable when executing in streaming mode.
+   * @param stateBackend The state backend to use
+   */
+  @Description("Sets the state backend to use in streaming mode. "
+      + "Otherwise the default is read from the Flink config.")
+  void setStateBackend(AbstractStateBackend stateBackend);
+  AbstractStateBackend getStateBackend();
+
 }