You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/27 13:33:21 UTC
[1/2] flink git commit: [FLINK-3085] [runtime] Initialize state
backends as part of "invoke()"
Repository: flink
Updated Branches:
refs/heads/master d359a974a -> e69d14521
[FLINK-3085] [runtime] Initialize state backends as part of "invoke()"
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a4effe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a4effe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a4effe
Branch: refs/heads/master
Commit: e4a4effe179a80516b84e2b590ab4d7c65efdeb1
Parents: d359a97
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 27 12:40:02 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 27 12:40:02 2015 +0100
----------------------------------------------------------------------
.../apache/flink/streaming/runtime/tasks/StreamTask.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4a4effe/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c310439..0a17ace 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -171,9 +171,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
configuration = new StreamConfig(getTaskConfiguration());
accumulatorMap = accumulatorRegistry.getUserMap();
- stateBackend = createStateBackend();
- stateBackend.initializeForJob(getEnvironment());
-
headOperator = configuration.getStreamOperator(userClassLoader);
operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
@@ -207,7 +204,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
boolean disposed = false;
try {
- // first order of business is to ive operators back their state
+ // first order of business is to initialize the state backend and to
+ // give operators back their state
+ stateBackend = createStateBackend();
+ stateBackend.initializeForJob(getEnvironment());
+
restoreStateLazy();
// we need to make sure that any triggers scheduled in open() cannot be
[2/2] flink git commit: [hofix] Properly set state backend from
execution environment to stream graph
Posted by se...@apache.org.
[hofix] Properly set state backend from execution environment to stream graph
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e69d1452
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e69d1452
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e69d1452
Branch: refs/heads/master
Commit: e69d14521117b34054aa643398febafefdc1834f
Parents: e4a4eff
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 27 13:20:23 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 27 13:20:23 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/streaming/api/graph/StreamGraphGenerator.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e69d1452/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 4bd7a73..3eaa8cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,6 +98,7 @@ public class StreamGraphGenerator {
private StreamGraphGenerator(StreamExecutionEnvironment env) {
this.streamGraph = new StreamGraph(env);
this.streamGraph.setChaining(env.isChainingEnabled());
+ this.streamGraph.setStateBackend(env.getStateBackend());
this.env = env;
this.alreadyTransformed = new HashMap<>();
}