You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/27 13:33:21 UTC

[1/2] flink git commit: [FLINK-3085] [runtime] Initialize state backends as part of "invoke()"

Repository: flink
Updated Branches:
  refs/heads/master d359a974a -> e69d14521


[FLINK-3085] [runtime] Initialize state backends as part of "invoke()"


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a4effe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a4effe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a4effe

Branch: refs/heads/master
Commit: e4a4effe179a80516b84e2b590ab4d7c65efdeb1
Parents: d359a97
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 27 12:40:02 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 27 12:40:02 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java    | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4a4effe/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c310439..0a17ace 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -171,9 +171,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			configuration = new StreamConfig(getTaskConfiguration());
 			accumulatorMap = accumulatorRegistry.getUserMap();
 
-			stateBackend = createStateBackend();
-			stateBackend.initializeForJob(getEnvironment());
-
 			headOperator = configuration.getStreamOperator(userClassLoader);
 			operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
 
@@ -207,7 +204,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		boolean disposed = false;
 		try {
-			// first order of business is to ive operators back their state
+			// first order of business is to initialize the state backend and to
+			// give operators back their state
+			stateBackend = createStateBackend();
+			stateBackend.initializeForJob(getEnvironment());
+			
 			restoreStateLazy();
 			
 			// we need to make sure that any triggers scheduled in open() cannot be


[2/2] flink git commit: [hofix] Properly set state backend from execution environment to stream graph

Posted by se...@apache.org.
[hofix] Properly set state backend from execution environment to stream graph


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e69d1452
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e69d1452
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e69d1452

Branch: refs/heads/master
Commit: e69d14521117b34054aa643398febafefdc1834f
Parents: e4a4eff
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 27 13:20:23 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 27 13:20:23 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/graph/StreamGraphGenerator.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e69d1452/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 4bd7a73..3eaa8cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,6 +98,7 @@ public class StreamGraphGenerator {
 	private StreamGraphGenerator(StreamExecutionEnvironment env) {
 		this.streamGraph = new StreamGraph(env);
 		this.streamGraph.setChaining(env.isChainingEnabled());
+		this.streamGraph.setStateBackend(env.getStateBackend());
 		this.env = env;
 		this.alreadyTransformed = new HashMap<>();
 	}