You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:43 UTC
[16/24] flink git commit: [hotfix] [streaming] Initialize
StreamingRuntimeContext to rich functions early
[hotfix] [streaming] Initialize StreamingRuntimeContext to rich functions early
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2d50386
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2d50386
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2d50386
Branch: refs/heads/master
Commit: f2d50386c8ad3c2faa492cfbe7c7f75dbbc49035
Parents: 69dfc40
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 18:20:53 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:10 2015 +0200
----------------------------------------------------------------------
.../api/operators/AbstractUdfStreamOperator.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2d50386/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 7a1bea4..a991fd3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -26,7 +26,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import static java.util.Objects.requireNonNull;
@@ -69,11 +72,18 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
// operator life cycle
// ------------------------------------------------------------------------
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+ super.setup(containingTask, config, output);
+
+ FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
+ }
+
@Override
public void open() throws Exception {
super.open();
- FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
FunctionUtils.openFunction(userFunction, new Configuration());
}