You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:30:13 UTC

[flink] 03/04: [FLINK-18419] Make user ClassLoader available in TableEnvironment

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 060af872130179697e073051f1a7c9563ef92851
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:30:34 2020 +0200

    [FLINK-18419] Make user ClassLoader available in TableEnvironment
---
 .../table/client/gateway/local/ExecutionContext.java     |  9 ++++++---
 .../bridge/java/internal/StreamTableEnvironmentImpl.java | 16 +++++++++++++---
 .../java/internal/StreamTableEnvironmentImplTest.java    |  3 ++-
 .../flink/table/api/internal/TableEnvironmentImpl.java   |  8 ++++++--
 .../apache/flink/table/utils/TableEnvironmentMock.java   | 10 +++++++++-
 .../scala/internal/StreamTableEnvironmentImpl.scala      |  9 ++++++---
 .../scala/internal/StreamTableEnvironmentImplTest.scala  |  3 ++-
 .../apache/flink/table/planner/utils/TableTestBase.scala |  9 ++++++---
 .../table/api/stream/StreamTableEnvironmentTest.scala    |  3 ++-
 .../flink/table/api/stream/sql/AggregateTest.scala       |  3 ++-
 .../org/apache/flink/table/utils/TableTestBase.scala     |  6 ++++--
 11 files changed, 58 insertions(+), 21 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0a444eb2..60e7736 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -424,7 +424,8 @@ public class ExecutionContext<ClusterID> {
 			Executor executor,
 			CatalogManager catalogManager,
 			ModuleManager moduleManager,
-			FunctionCatalog functionCatalog) {
+			FunctionCatalog functionCatalog,
+			ClassLoader userClassLoader) {
 
 		final Map<String, String> plannerProperties = settings.toPlannerProperties();
 		final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -438,7 +439,8 @@ public class ExecutionContext<ClusterID> {
 			env,
 			planner,
 			executor,
-			settings.isStreamingMode());
+			settings.isStreamingMode(),
+			userClassLoader);
 	}
 
 	private static Executor lookupExecutor(
@@ -599,7 +601,8 @@ public class ExecutionContext<ClusterID> {
 					executor,
 					catalogManager,
 					moduleManager,
-					functionCatalog);
+					functionCatalog,
+					classLoader);
 		} else if (environment.getExecution().isBatchPlanner()) {
 			streamExecEnv = null;
 			execEnv = ExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 15877ad..05c2f20 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -89,8 +89,17 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
 			Executor executor,
-			boolean isStreamingMode) {
-		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+			boolean isStreamingMode,
+			ClassLoader userClassLoader) {
+		super(
+			catalogManager,
+			moduleManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			isStreamingMode,
+			userClassLoader);
 		this.executionEnvironment = executionEnvironment;
 	}
 
@@ -137,7 +146,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			executionEnvironment,
 			planner,
 			executor,
-			settings.isStreamingMode()
+			settings.isStreamingMode(),
+			classLoader
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index 954fc45..7fcbf80 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -101,7 +101,8 @@ public class StreamTableEnvironmentImplTest {
 			env,
 			new TestPlanner(elements.getTransformation()),
 			new ExecutorMock(),
-			true
+			true,
+			this.getClass().getClassLoader()
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 08409b9..99306a0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -165,6 +165,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	protected final Planner planner;
 	protected final Parser parser;
 	private final boolean isStreamingMode;
+	private final ClassLoader userClassLoader;
 	private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
 			"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
 			"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -198,7 +199,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			Executor executor,
 			FunctionCatalog functionCatalog,
 			Planner planner,
-			boolean isStreamingMode) {
+			boolean isStreamingMode,
+			ClassLoader userClassLoader) {
 		this.catalogManager = catalogManager;
 		this.catalogManager.setCatalogTableSchemaResolver(
 				new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
@@ -211,6 +213,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		this.planner = planner;
 		this.parser = planner.getParser();
 		this.isStreamingMode = isStreamingMode;
+		this.userClassLoader = userClassLoader;
 		this.operationTreeBuilder = OperationTreeBuilder.create(
 			tableConfig,
 			functionCatalog.asLookup(parser::parseIdentifier),
@@ -273,7 +276,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			executor,
 			functionCatalog,
 			planner,
-			settings.isStreamingMode()
+			settings.isStreamingMode(),
+			classLoader
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dcbec07..5d0f021 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -46,7 +46,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
 			FunctionCatalog functionCatalog,
 			PlannerMock planner,
 			boolean isStreamingMode) {
-		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+		super(
+			catalogManager,
+			moduleManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			isStreamingMode,
+			TableEnvironmentMock.class.getClassLoader());
 
 		this.catalogManager = catalogManager;
 		this.executor = executor;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7c99e37..317cb6a 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -58,7 +58,8 @@ class StreamTableEnvironmentImpl (
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
     executor: Executor,
-    isStreaming: Boolean)
+    isStreaming: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -66,7 +67,8 @@ class StreamTableEnvironmentImpl (
     executor,
     functionCatalog,
     planner,
-    isStreaming)
+    isStreaming,
+    userClassLoader)
   with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl {
       executionEnvironment,
       planner,
       executor,
-      settings.isStreamingMode
+      settings.isStreamingMode,
+      classLoader
     )
   }
 
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 0f84d9b..7f67ebb 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest {
       env,
       new TestPlanner(elements.javaStream.getTransformation),
       new ExecutorMock,
-      true)
+      true,
+      this.getClass.getClassLoader)
   }
 
   private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 05b54c7..c942911 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -959,7 +959,8 @@ class TestingTableEnvironment private(
     executor: Executor,
     functionCatalog: FunctionCatalog,
     planner: PlannerBase,
-    isStreamingMode: Boolean)
+    isStreamingMode: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -967,7 +968,8 @@ class TestingTableEnvironment private(
     executor,
     functionCatalog,
     planner,
-    isStreamingMode) {
+    isStreamingMode,
+    userClassLoader) {
 
   // just for testing, remove this method while
   // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
@@ -1118,7 +1120,8 @@ object TestingTableEnvironment {
       executor,
       functionCatalog,
       planner,
-      settings.isStreamingMode)
+      settings.isStreamingMode,
+      classLoader)
   }
 }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 5ef1158..406e6b4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
       jStreamExecEnv,
       streamPlanner,
       executor,
-      true)
+      true,
+      Thread.currentThread().getContextClassLoader)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index f7da2d6..835a26f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase {
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       new PlannerMock,
       Mockito.mock(classOf[Executor]),
-      true
+      true,
+      Thread.currentThread().getContextClassLoader
     )
 
     tablEnv.registerFunction("udag", new MyAgg)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 44e0bb9..562a240 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -347,7 +347,8 @@ case class StreamTableTestUtil(
     javaEnv,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -358,7 +359,8 @@ case class StreamTableTestUtil(
     env,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   def addTable[T: TypeInformation](
       name: String,