You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/05 09:25:16 UTC
[flink] branch release-1.7 updated: [FLINK-11045][table] Set
correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new 6523028 [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
6523028 is described below
commit 652302863a5f1b5f07b9cd80692d93bf9e449181
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Sun Dec 2 19:05:49 2018 +0800
[FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
This closes #7213
---
.../api/common/operators/CollectionExecutor.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 55f3df7..3a0b84c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -83,7 +83,7 @@ public class CollectionExecutor {
private final Map<String, Aggregator<?>> aggregators;
- private final ClassLoader classLoader;
+ private final ClassLoader userCodeClassLoader;
private final ExecutionConfig executionConfig;
@@ -99,7 +99,7 @@ public class CollectionExecutor {
this.previousAggregates = new HashMap<String, Value>();
this.aggregators = new HashMap<String, Aggregator<?>>();
this.cachedFiles = new HashMap<String, Future<Path>>();
- this.classLoader = getClass().getClassLoader();
+ this.userCodeClassLoader = Thread.currentThread().getContextClassLoader();
}
// --------------------------------------------------------------------------------------------
@@ -191,8 +191,8 @@ public class CollectionExecutor {
MetricGroup metrics = new UnregisteredMetricsGroup();
if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
- ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
- new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+ ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+ new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
} else {
ctx = null;
}
@@ -211,8 +211,8 @@ public class CollectionExecutor {
MetricGroup metrics = new UnregisteredMetricsGroup();
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
- ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
- new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+ ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+ new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
} else {
ctx = null;
}
@@ -237,8 +237,8 @@ public class CollectionExecutor {
MetricGroup metrics = new UnregisteredMetricsGroup();
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
- ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
- new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+ ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+ new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
@@ -278,8 +278,8 @@ public class CollectionExecutor {
MetricGroup metrics = new UnregisteredMetricsGroup();
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
- ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
- new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+ ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+ new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());