You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/09/09 14:03:53 UTC
[2/4] incubator-beam git commit: [BEAM-617][flink] introduce option
to set state backend
[BEAM-617][flink] introduce option to set state backend
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d4f85912
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d4f85912
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d4f85912
Branch: refs/heads/master
Commit: d4f85912effd2c04cac99d693a87bf6e2d597e9c
Parents: be689df
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 6 16:25:32 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Sep 9 15:59:48 2016 +0200
----------------------------------------------------------------------
.../runners/flink/FlinkPipelineExecutionEnvironment.java | 7 +++++++
.../apache/beam/runners/flink/FlinkPipelineOptions.java | 11 +++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index a5d33b4..391c3f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
@@ -221,6 +222,12 @@ public class FlinkPipelineExecutionEnvironment {
flinkStreamEnv.enableCheckpointing(checkpointInterval);
}
+ // State backend
+ final AbstractStateBackend stateBackend = options.getStateBackend();
+ if (stateBackend != null) {
+ flinkStreamEnv.setStateBackend(stateBackend);
+ }
+
return flinkStreamEnv;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d4f85912/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 1fb23ec..a067e76 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
/**
* Options which can be used to configure a Flink PipelineRunner.
@@ -82,4 +83,14 @@ public interface FlinkPipelineOptions
Long getExecutionRetryDelay();
void setExecutionRetryDelay(Long delay);
+ /**
+ * Sets a state backend to store Beam's state during computation.
+ * Note: Only applicable when executing in streaming mode.
+ * @param stateBackend The state backend to use
+ */
+ @Description("Sets the state backend to use in streaming mode. "
+ + "Otherwise the default is read from the Flink config.")
+ void setStateBackend(AbstractStateBackend stateBackend);
+ AbstractStateBackend getStateBackend();
+
}