You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:09:30 UTC

[flink] branch master updated (f2b6608 -> 69ef4b7)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f2b6608  [FLINK-18434] Fix getter methods of AbstractJdbcCatalog
     new 8158421  [hotfix] Extend ClassLoaderUtils with a way to add resources
     new abf3091  [FLINK-18419] Make user ClassLoader available in TableEnvironment
     new 69ef4b7  [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/testutils/ClassLoaderUtils.java   | 17 +++++++
 .../client/gateway/local/ExecutionContext.java     |  9 ++--
 .../java/internal/StreamTableEnvironmentImpl.java  | 16 +++++--
 .../internal/StreamTableEnvironmentImplTest.java   |  3 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 38 ++++++++++-----
 .../operations/ddl/CreateCatalogOperation.java     | 12 ++---
 .../flink/table/utils/TableEnvironmentMock.java    | 10 +++-
 .../internal/StreamTableEnvironmentImpl.scala      |  9 ++--
 .../internal/StreamTableEnvironmentImplTest.scala  |  3 +-
 flink-table/flink-table-planner-blink/pom.xml      |  8 ++++
 .../operations/SqlToOperationConverter.java        |  8 +---
 .../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  9 ++--
 .../api/stream/StreamTableEnvironmentTest.scala    |  3 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |  3 +-
 .../apache/flink/table/utils/TableTestBase.scala   |  6 ++-
 16 files changed, 165 insertions(+), 43 deletions(-)


[flink] 03/03: [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69ef4b7cffb446cca655f7c426a49d9fbb3a8cc3
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:31:23 2020 +0200

    [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader
---
 .../table/api/internal/TableEnvironmentImpl.java   | 30 ++++++++----
 .../operations/ddl/CreateCatalogOperation.java     | 12 ++---
 flink-table/flink-table-planner-blink/pom.xml      |  8 ++++
 .../operations/SqlToOperationConverter.java        |  8 +---
 .../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++
 5 files changed, 90 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 954edb5..e853d28 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -71,7 +71,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
@@ -983,15 +985,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		} else if (operation instanceof AlterCatalogFunctionOperation) {
 			return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
 		} else if (operation instanceof CreateCatalogOperation) {
-			CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
-			String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
-			try {
-				catalogManager.registerCatalog(
-						createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
-				return TableResultImpl.TABLE_RESULT_OK;
-			} catch (CatalogException e) {
-				throw new ValidationException(exMsg, e);
-			}
+			return createCatalog((CreateCatalogOperation) operation);
 		} else if (operation instanceof DropCatalogOperation) {
 			DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
 			String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
@@ -1047,6 +1041,24 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		}
 	}
 
+	private TableResult createCatalog(CreateCatalogOperation operation) {
+		String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+		try {
+			String catalogName = operation.getCatalogName();
+			Map<String, String> properties = operation.getProperties();
+			final CatalogFactory factory = TableFactoryService.find(
+				CatalogFactory.class,
+				properties,
+				userClassLoader);
+
+			Catalog catalog = factory.createCatalog(catalogName, properties);
+			catalogManager.registerCatalog(catalogName, catalog);
+			return TableResultImpl.TABLE_RESULT_OK;
+		} catch (CatalogException e) {
+			throw new ValidationException(exMsg, e);
+		}
+	}
+
 	private TableResult buildShowResult(String columnName, String[] objects) {
 		return buildResult(
 			new String[]{columnName},
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
index 1d04a20..c81ee1d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.operations.ddl;
 
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
 
@@ -33,25 +32,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CreateCatalogOperation implements CreateOperation {
 	private final String catalogName;
-	private final Catalog catalog;
+	private final Map<String, String> properties;
 
-	public CreateCatalogOperation(String catalogName, Catalog catalog) {
+	public CreateCatalogOperation(String catalogName, Map<String, String> properties) {
 		this.catalogName = checkNotNull(catalogName);
-		this.catalog = checkNotNull(catalog);
+		this.properties = checkNotNull(properties);
 	}
 
 	public String getCatalogName() {
 		return catalogName;
 	}
 
-	public Catalog getCatalog() {
-		return catalog;
+	public Map<String, String> getProperties() {
+		return Collections.unmodifiableMap(properties);
 	}
 
 	@Override
 	public String asSummaryString() {
 		Map<String, Object> params = new LinkedHashMap<>();
 		params.put("catalogName", catalogName);
+		params.put("properties", properties);
 
 		return OperationUtils.formatWithChildren(
 			"CREATE CATALOG",
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index 506dfd2..b6977af 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -223,6 +223,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- SuccessException used in TestValuesTableFactory -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 386cb9a..be54497 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,8 +74,6 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
@@ -553,11 +551,7 @@ public class SqlToOperationConverter {
 		sqlCreateCatalog.getPropertyList().getList().forEach(p ->
 			properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
 
-		final CatalogFactory factory =
-			TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());
-
-		Catalog catalog = factory.createCatalog(catalogName, properties);
-		return new CreateCatalogOperation(catalogName, catalog);
+		return new CreateCatalogOperation(catalogName, properties);
 	}
 
 	/** Convert DROP CATALOG statement. */
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 4e8e95b..8a46c1e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -23,8 +23,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
 
 import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
 import static org.junit.Assert.assertFalse;
@@ -35,6 +41,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class CatalogITCase {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testCreateCatalog() {
 		String name = "c1";
@@ -61,9 +70,54 @@ public class CatalogITCase {
 		assertFalse(tableEnv.getCatalog(name).isPresent());
 	}
 
+	@Test
+	public void testCreateCatalogFromUserClassLoader() throws Exception {
+		final String className = "UserCatalogFactory";
+		URLClassLoader classLoader = ClassLoaderUtils.withRoot(temporaryFolder.newFolder())
+			.addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", "UserCatalogFactory")
+			.addClass(
+				className,
+				"import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" +
+					"import org.apache.flink.table.factories.CatalogFactory;\n" +
+					"import java.util.Collections;\n" +
+					"import org.apache.flink.table.catalog.Catalog;\n" +
+					"import java.util.HashMap;\n" +
+					"import java.util.List;\n" +
+					"import java.util.Map;\n" +
+					"\tpublic class UserCatalogFactory implements CatalogFactory {\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic Catalog createCatalog(\n" +
+					"\t\t\t\tString name,\n" +
+					"\t\t\t\tMap<String, String> properties) {\n" +
+					"\t\t\treturn new GenericInMemoryCatalog(name);\n" +
+					"\t\t}\n" +
+					"\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic Map<String, String> requiredContext() {\n" +
+					"\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n" +
+					"\t\t\thashMap.put(\"type\", \"userCatalog\");\n" +
+					"\t\t\treturn hashMap;\n" +
+					"\t\t}\n" +
+					"\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic List<String> supportedProperties() {\n" +
+					"\t\t\treturn Collections.emptyList();\n" +
+					"\t\t}\n" +
+					"\t}"
+			).build();
+
+		try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
+			TableEnvironment tableEnvironment = getTableEnvironment();
+			tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')");
+
+			assertTrue(tableEnvironment.getCatalog("cat").isPresent());
+		}
+	}
+
 	private TableEnvironment getTableEnvironment() {
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		return StreamTableEnvironment.create(env, settings);
 	}
+
 }


[flink] 02/03: [FLINK-18419] Make user ClassLoader available in TableEnvironment

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abf3091e9da92660490e600b7605f3e2d2d53ad4
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:30:34 2020 +0200

    [FLINK-18419] Make user ClassLoader available in TableEnvironment
---
 .../table/client/gateway/local/ExecutionContext.java     |  9 ++++++---
 .../bridge/java/internal/StreamTableEnvironmentImpl.java | 16 +++++++++++++---
 .../java/internal/StreamTableEnvironmentImplTest.java    |  3 ++-
 .../flink/table/api/internal/TableEnvironmentImpl.java   |  8 ++++++--
 .../apache/flink/table/utils/TableEnvironmentMock.java   | 10 +++++++++-
 .../scala/internal/StreamTableEnvironmentImpl.scala      |  9 ++++++---
 .../scala/internal/StreamTableEnvironmentImplTest.scala  |  3 ++-
 .../apache/flink/table/planner/utils/TableTestBase.scala |  9 ++++++---
 .../table/api/stream/StreamTableEnvironmentTest.scala    |  3 ++-
 .../flink/table/api/stream/sql/AggregateTest.scala       |  3 ++-
 .../org/apache/flink/table/utils/TableTestBase.scala     |  6 ++++--
 11 files changed, 58 insertions(+), 21 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 0a444eb2..60e7736 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -424,7 +424,8 @@ public class ExecutionContext<ClusterID> {
 			Executor executor,
 			CatalogManager catalogManager,
 			ModuleManager moduleManager,
-			FunctionCatalog functionCatalog) {
+			FunctionCatalog functionCatalog,
+			ClassLoader userClassLoader) {
 
 		final Map<String, String> plannerProperties = settings.toPlannerProperties();
 		final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -438,7 +439,8 @@ public class ExecutionContext<ClusterID> {
 			env,
 			planner,
 			executor,
-			settings.isStreamingMode());
+			settings.isStreamingMode(),
+			userClassLoader);
 	}
 
 	private static Executor lookupExecutor(
@@ -599,7 +601,8 @@ public class ExecutionContext<ClusterID> {
 					executor,
 					catalogManager,
 					moduleManager,
-					functionCatalog);
+					functionCatalog,
+					classLoader);
 		} else if (environment.getExecution().isBatchPlanner()) {
 			streamExecEnv = null;
 			execEnv = ExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 15877ad..05c2f20 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -89,8 +89,17 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
 			Executor executor,
-			boolean isStreamingMode) {
-		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+			boolean isStreamingMode,
+			ClassLoader userClassLoader) {
+		super(
+			catalogManager,
+			moduleManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			isStreamingMode,
+			userClassLoader);
 		this.executionEnvironment = executionEnvironment;
 	}
 
@@ -137,7 +146,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			executionEnvironment,
 			planner,
 			executor,
-			settings.isStreamingMode()
+			settings.isStreamingMode(),
+			classLoader
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index 954fc45..7fcbf80 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -101,7 +101,8 @@ public class StreamTableEnvironmentImplTest {
 			env,
 			new TestPlanner(elements.getTransformation()),
 			new ExecutorMock(),
-			true
+			true,
+			this.getClass().getClassLoader()
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 05c578b..954edb5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -164,6 +164,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	protected final Planner planner;
 	protected final Parser parser;
 	private final boolean isStreamingMode;
+	private final ClassLoader userClassLoader;
 	private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
 			"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
 			"INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
@@ -197,7 +198,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			Executor executor,
 			FunctionCatalog functionCatalog,
 			Planner planner,
-			boolean isStreamingMode) {
+			boolean isStreamingMode,
+			ClassLoader userClassLoader) {
 		this.catalogManager = catalogManager;
 		this.catalogManager.setCatalogTableSchemaResolver(
 				new CatalogTableSchemaResolver(planner.getParser(), isStreamingMode));
@@ -210,6 +212,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		this.planner = planner;
 		this.parser = planner.getParser();
 		this.isStreamingMode = isStreamingMode;
+		this.userClassLoader = userClassLoader;
 		this.operationTreeBuilder = OperationTreeBuilder.create(
 			tableConfig,
 			functionCatalog.asLookup(parser::parseIdentifier),
@@ -272,7 +275,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 			executor,
 			functionCatalog,
 			planner,
-			settings.isStreamingMode()
+			settings.isStreamingMode(),
+			classLoader
 		);
 	}
 
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dcbec07..5d0f021 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -46,7 +46,15 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
 			FunctionCatalog functionCatalog,
 			PlannerMock planner,
 			boolean isStreamingMode) {
-		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+		super(
+			catalogManager,
+			moduleManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			isStreamingMode,
+			TableEnvironmentMock.class.getClassLoader());
 
 		this.catalogManager = catalogManager;
 		this.executor = executor;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 7c99e37..317cb6a 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -58,7 +58,8 @@ class StreamTableEnvironmentImpl (
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
     executor: Executor,
-    isStreaming: Boolean)
+    isStreaming: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -66,7 +67,8 @@ class StreamTableEnvironmentImpl (
     executor,
     functionCatalog,
     planner,
-    isStreaming)
+    isStreaming,
+    userClassLoader)
   with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
@@ -299,7 +301,8 @@ object StreamTableEnvironmentImpl {
       executionEnvironment,
       planner,
       executor,
-      settings.isStreamingMode
+      settings.isStreamingMode,
+      classLoader
     )
   }
 
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 0f84d9b..7f67ebb 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -91,7 +91,8 @@ class StreamTableEnvironmentImplTest {
       env,
       new TestPlanner(elements.javaStream.getTransformation),
       new ExecutorMock,
-      true)
+      true,
+      this.getClass.getClassLoader)
   }
 
   private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 05b54c7..c942911 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -959,7 +959,8 @@ class TestingTableEnvironment private(
     executor: Executor,
     functionCatalog: FunctionCatalog,
     planner: PlannerBase,
-    isStreamingMode: Boolean)
+    isStreamingMode: Boolean,
+    userClassLoader: ClassLoader)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
@@ -967,7 +968,8 @@ class TestingTableEnvironment private(
     executor,
     functionCatalog,
     planner,
-    isStreamingMode) {
+    isStreamingMode,
+    userClassLoader) {
 
   // just for testing, remove this method while
   // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
@@ -1118,7 +1120,8 @@ object TestingTableEnvironment {
       executor,
       functionCatalog,
       planner,
-      settings.isStreamingMode)
+      settings.isStreamingMode,
+      classLoader)
   }
 }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 5ef1158..406e6b4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -425,7 +425,8 @@ class StreamTableEnvironmentTest extends TableTestBase {
       jStreamExecEnv,
       streamPlanner,
       executor,
-      true)
+      true,
+      Thread.currentThread().getContextClassLoader)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index f7da2d6..835a26f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -79,7 +79,8 @@ class AggregateTest extends TableTestBase {
       Mockito.mock(classOf[StreamExecutionEnvironment]),
       new PlannerMock,
       Mockito.mock(classOf[Executor]),
-      true
+      true,
+      Thread.currentThread().getContextClassLoader
     )
 
     tablEnv.registerFunction("udag", new MyAgg)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 44e0bb9..562a240 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -347,7 +347,8 @@ case class StreamTableTestUtil(
     javaEnv,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
@@ -358,7 +359,8 @@ case class StreamTableTestUtil(
     env,
     streamPlanner,
     executor,
-    true)
+    true,
+    Thread.currentThread().getContextClassLoader)
 
   def addTable[T: TypeInformation](
       name: String,


[flink] 01/03: [hotfix] Extend ClassLoaderUtils with a way to add resources

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8158421c988247220b25af2b06b3488d90d3eb3f
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:16:49 2020 +0200

    [hotfix] Extend ClassLoaderUtils with a way to add resources
---
 .../org/apache/flink/testutils/ClassLoaderUtils.java    | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
index 1b9add5..e223b95 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java
@@ -64,6 +64,7 @@ public class ClassLoaderUtils {
 
 	private static File writeSourceFile(File root, String filename, String source) throws IOException {
 		File file = new File(root, filename);
+		file.getParentFile().mkdirs();
 		FileWriter fileWriter = new FileWriter(file);
 
 		fileWriter.write(source);
@@ -102,10 +103,22 @@ public class ClassLoaderUtils {
 
 		private final File root;
 		private final Map<String, String> classes;
+		private final Map<String, String> resources;
 
 		private ClassLoaderBuilder(File root) {
 			this.root = root;
 			this.classes = new HashMap<>();
+			this.resources = new HashMap<>();
+		}
+
+		public ClassLoaderBuilder addResource(String targetPath, String resource){
+			String oldValue = resources.putIfAbsent(targetPath, resource);
+
+			if (oldValue != null) {
+				throw new RuntimeException(String.format("Resource with path %s already registered.", resource));
+			}
+
+			return this;
 		}
 
 		public ClassLoaderBuilder addClass(String className, String source) {
@@ -123,6 +136,10 @@ public class ClassLoaderUtils {
 				writeAndCompile(root, createFileName(classInfo.getKey()), classInfo.getValue());
 			}
 
+			for (Map.Entry<String, String> resource : resources.entrySet()) {
+				writeSourceFile(root, resource.getKey(), resource.getValue());
+			}
+
 			return createClassLoader(root);
 		}