You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:43 UTC

[16/24] flink git commit: [hotfix] [streaming] Initialize StreamingRuntimeContext to rich functions early

[hotfix] [streaming] Initialize StreamingRuntimeContext to rich functions early


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2d50386
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2d50386
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2d50386

Branch: refs/heads/master
Commit: f2d50386c8ad3c2faa492cfbe7c7f75dbbc49035
Parents: 69dfc40
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:20:53 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractUdfStreamOperator.java        | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2d50386/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 7a1bea4..a991fd3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -26,7 +26,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import static java.util.Objects.requireNonNull;
@@ -69,11 +72,18 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 	//  operator life cycle
 	// ------------------------------------------------------------------------
 
+
+	@Override
+	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+		super.setup(containingTask, config, output);
+		
+		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
+	}
+
 	@Override
 	public void open() throws Exception {
 		super.open();
 		
-		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
 		FunctionUtils.openFunction(userFunction, new Configuration());
 	}