You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:30:13 UTC
[flink] 03/04: [FLINK-18419] Make user ClassLoader available in
TableEnvironment
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 060af872130179697e073051f1a7c9563ef92851
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:30:34 2020 +0200
[FLINK-18419] Make user ClassLoader available in TableEnvironment
---
.../table/client/gateway/local/ExecutionContext.java | 9 ++++++---
.../bridge/java/internal/StreamTableEnvironmentImpl.java | 16 +++++++++++++---
.../java/internal/StreamTableEnvironmentImplTest.java | 3 ++-
.../flink/table/api/internal/TableEnvironmentImpl.java | 8 ++++++--
.../apache/flink/table/utils/TableEnvironmentMock.java | 10 +++++++++-
.../scala/internal/StreamTableEnvironmentImpl.scala | 9 ++++++---
.../scala/internal/StreamTableEnvironmentImplTest.scala | 3 ++-
.../apache/flink/table/planner/utils/TableTestBase.scala | 9 ++++++---
.../table/api/stream/StreamTableEnvironmentTest.scala | 3 ++-
.../flink/table/api/stream/sql/AggregateTest.scala | 3 ++-
.../org/apache/flink/table/utils/TableTestBase.scala | 6 ++++--
11 files changed, 58 insertions(+), 21 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0a444eb2..60e7736 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -424,7 +424,8 @@ public class ExecutionContext<ClusterID> {
Executor executor,
CatalogManager catalogManager,
ModuleManager moduleManager,
- FunctionCatalog functionCatalog) {
+ FunctionCatalog functionCatalog,
+ ClassLoader userClassLoader) {
final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -438,7 +439,8 @@ public class ExecutionContext<ClusterID> {
env,
planner,
executor,
- settings.isStreamingMode());
+ settings.isStreamingMode(),
+ userClassLoader);
}
private static Executor lookupExecutor(
@@ -599,7 +601,8 @@ public class ExecutionContext<ClusterID> {
executor,
catalogManager,
moduleManager,
- functionCatalog);
+ functionCatalog,
+ classLoader);
} else if (environment.getExecution().isBatchPlanner()) {
streamExecEnv = null;
execEnv = ExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 15877ad..05c2f20 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -89,8 +89,17 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
- boolean isStreamingMode) {
- super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+ boolean isStreamingMode,
+ ClassLoader userClassLoader) {
+ super(
+ catalogManager,
+ moduleManager,
+ tableConfig,
+ executor,
+ functionCatalog,
+ planner,
+ isStreamingMode,
+ userClassLoader);
this.executionEnvironment = executionEnvironment;
}
@@ -137,7 +146,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
executionEnvironment,
planner,
executor,
- settings.isStreamingMode()
+ settings.isStreamingMode(),
+ classLoader
);
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index 954fc45..7fcbf80 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -101,7 +101,8 @@ public class StreamTableEnvironmentImplTest {
env,
new TestPlanner(elements.getTransformation()),
new ExecutorMock(),
- true
+ true,
+ this.getClass().getClassLoader()
);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 08409b9..99306a0 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -165,6 +165,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
protected final Planner planner;
protected final Parser parser;
private final boolean isStreamingMode;
+ private final ClassLoader userClassLoader;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -198,7 +199,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
- boolean isStreamingMode) {
+ boolean isStreamingMode,
+ ClassLoader userClassLoader) {
this.catalogManager = catalogManager;
this.catalogManager.setCatalogTableSchemaResolver(
new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
@@ -211,6 +213,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
this.planner = planner;
this.parser = planner.getParser();
this.isStreamingMode = isStreamingMode;
+ this.userClassLoader = userClassLoader;
this.operationTreeBuilder = OperationTreeBuilder.create(
tableConfig,
functionCatalog.asLookup(parser::parseIdentifier),
@@ -273,7 +276,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
executor,
functionCatalog,
planner,
- settings.isStreamingMode()
+ settings.isStreamingMode(),
+ classLoader
);
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dcbec07..5d0f021 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -46,7 +46,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
FunctionCatalog functionCatalog,
PlannerMock planner,
boolean isStreamingMode) {
- super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+ super(
+ catalogManager,
+ moduleManager,
+ tableConfig,
+ executor,
+ functionCatalog,
+ planner,
+ isStreamingMode,
+ TableEnvironmentMock.class.getClassLoader());
this.catalogManager = catalogManager;
this.executor = executor;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7c99e37..317cb6a 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -58,7 +58,8 @@ class StreamTableEnvironmentImpl (
scalaExecutionEnvironment: StreamExecutionEnvironment,
planner: Planner,
executor: Executor,
- isStreaming: Boolean)
+ isStreaming: Boolean,
+ userClassLoader: ClassLoader)
extends TableEnvironmentImpl(
catalogManager,
moduleManager,
@@ -66,7 +67,8 @@ class StreamTableEnvironmentImpl (
executor,
functionCatalog,
planner,
- isStreaming)
+ isStreaming,
+ userClassLoader)
with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {
override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl {
executionEnvironment,
planner,
executor,
- settings.isStreamingMode
+ settings.isStreamingMode,
+ classLoader
)
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 0f84d9b..7f67ebb 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest {
env,
new TestPlanner(elements.javaStream.getTransformation),
new ExecutorMock,
- true)
+ true,
+ this.getClass.getClassLoader)
}
private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 05b54c7..c942911 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -959,7 +959,8 @@ class TestingTableEnvironment private(
executor: Executor,
functionCatalog: FunctionCatalog,
planner: PlannerBase,
- isStreamingMode: Boolean)
+ isStreamingMode: Boolean,
+ userClassLoader: ClassLoader)
extends TableEnvironmentImpl(
catalogManager,
moduleManager,
@@ -967,7 +968,8 @@ class TestingTableEnvironment private(
executor,
functionCatalog,
planner,
- isStreamingMode) {
+ isStreamingMode,
+ userClassLoader) {
// just for testing, remove this method while
// `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
@@ -1118,7 +1120,8 @@ object TestingTableEnvironment {
executor,
functionCatalog,
planner,
- settings.isStreamingMode)
+ settings.isStreamingMode,
+ classLoader)
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 5ef1158..406e6b4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
jStreamExecEnv,
streamPlanner,
executor,
- true)
+ true,
+ Thread.currentThread().getContextClassLoader)
val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
.asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index f7da2d6..835a26f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase {
Mockito.mock(classOf[StreamExecutionEnvironment]),
new PlannerMock,
Mockito.mock(classOf[Executor]),
- true
+ true,
+ Thread.currentThread().getContextClassLoader
)
tablEnv.registerFunction("udag", new MyAgg)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 44e0bb9..562a240 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -347,7 +347,8 @@ case class StreamTableTestUtil(
javaEnv,
streamPlanner,
executor,
- true)
+ true,
+ Thread.currentThread().getContextClassLoader)
val env = new StreamExecutionEnvironment(javaEnv)
val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -358,7 +359,8 @@ case class StreamTableTestUtil(
env,
streamPlanner,
executor,
- true)
+ true,
+ Thread.currentThread().getContextClassLoader)
def addTable[T: TypeInformation](
name: String,