You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2018/12/05 09:25:16 UTC

[flink] branch release-1.7 updated: [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 6523028  [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
6523028 is described below

commit 652302863a5f1b5f07b9cd80692d93bf9e449181
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Sun Dec 2 19:05:49 2018 +0800

    [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
    
    This closes #7213
---
 .../api/common/operators/CollectionExecutor.java     | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 55f3df7..3a0b84c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -83,7 +83,7 @@ public class CollectionExecutor {
 	
 	private final Map<String, Aggregator<?>> aggregators;
 	
-	private final ClassLoader classLoader;
+	private final ClassLoader userCodeClassLoader;
 	
 	private final ExecutionConfig executionConfig;
 
@@ -99,7 +99,7 @@ public class CollectionExecutor {
 		this.previousAggregates = new HashMap<String, Value>();
 		this.aggregators = new HashMap<String, Aggregator<?>>();
 		this.cachedFiles = new HashMap<String, Future<Path>>();
-		this.classLoader = getClass().getClassLoader();
+		this.userCodeClassLoader = Thread.currentThread().getContextClassLoader();
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -191,8 +191,8 @@ public class CollectionExecutor {
 		MetricGroup metrics = new UnregisteredMetricsGroup();
 			
 		if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-					new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+					new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
 		} else {
 			ctx = null;
 		}
@@ -211,8 +211,8 @@ public class CollectionExecutor {
 
 		MetricGroup metrics = new UnregisteredMetricsGroup();
 		if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-					new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+					new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
 		} else {
 			ctx = null;
 		}
@@ -237,8 +237,8 @@ public class CollectionExecutor {
 
 		MetricGroup metrics = new UnregisteredMetricsGroup();
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-					new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+					new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -278,8 +278,8 @@ public class CollectionExecutor {
 		MetricGroup metrics = new UnregisteredMetricsGroup();
 	
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-				new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
+			ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+				new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());