You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:30:14 UTC

[flink] 04/04: [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1aa92e897e7d71a1063b87ee08496edac6e4b633
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:31:23 2020 +0200

    [FLINK-18419] Create catalog in TableEnvironment using user ClassLoader
---
 .../table/api/internal/TableEnvironmentImpl.java   | 30 ++++++++----
 .../operations/ddl/CreateCatalogOperation.java     | 12 ++---
 flink-table/flink-table-planner-blink/pom.xml      |  8 ++++
 .../operations/SqlToOperationConverter.java        |  8 +---
 .../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++
 5 files changed, 90 insertions(+), 22 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 99306a0..d4f43db 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,7 +72,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
@@ -1014,15 +1016,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		} else if (operation instanceof AlterCatalogFunctionOperation) {
 			return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
 		} else if (operation instanceof CreateCatalogOperation) {
-			CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
-			String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
-			try {
-				catalogManager.registerCatalog(
-						createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
-				return TableResultImpl.TABLE_RESULT_OK;
-			} catch (CatalogException e) {
-				throw new ValidationException(exMsg, e);
-			}
+			return createCatalog((CreateCatalogOperation) operation);
 		} else if (operation instanceof DropCatalogOperation) {
 			DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
 			String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
@@ -1078,6 +1072,24 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		}
 	}
 
+	private TableResult createCatalog(CreateCatalogOperation operation) {
+		String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+		try {
+			String catalogName = operation.getCatalogName();
+			Map<String, String> properties = operation.getProperties();
+			final CatalogFactory factory = TableFactoryService.find(
+				CatalogFactory.class,
+				properties,
+				userClassLoader);
+
+			Catalog catalog = factory.createCatalog(catalogName, properties);
+			catalogManager.registerCatalog(catalogName, catalog);
+			return TableResultImpl.TABLE_RESULT_OK;
+		} catch (CatalogException e) {
+			throw new ValidationException(exMsg, e);
+		}
+	}
+
 	private TableResult buildShowResult(String columnName, String[] objects) {
 		return buildResult(
 			new String[]{columnName},
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
index 1d04a20..c81ee1d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.operations.ddl;
 
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.OperationUtils;
 
@@ -33,25 +32,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CreateCatalogOperation implements CreateOperation {
 	private final String catalogName;
-	private final Catalog catalog;
+	private final Map<String, String> properties;
 
-	public CreateCatalogOperation(String catalogName, Catalog catalog) {
+	public CreateCatalogOperation(String catalogName, Map<String, String> properties) {
 		this.catalogName = checkNotNull(catalogName);
-		this.catalog = checkNotNull(catalog);
+		this.properties = checkNotNull(properties);
 	}
 
 	public String getCatalogName() {
 		return catalogName;
 	}
 
-	public Catalog getCatalog() {
-		return catalog;
+	public Map<String, String> getProperties() {
+		return Collections.unmodifiableMap(properties);
 	}
 
 	@Override
 	public String asSummaryString() {
 		Map<String, Object> params = new LinkedHashMap<>();
 		params.put("catalogName", catalogName);
+		params.put("properties", properties);
 
 		return OperationUtils.formatWithChildren(
 			"CREATE CATALOG",
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index a7cfb3a..085eba5 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -223,6 +223,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<!-- SuccessException used in TestValuesTableFactory -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 386cb9a..be54497 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,8 +74,6 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
@@ -553,11 +551,7 @@ public class SqlToOperationConverter {
 		sqlCreateCatalog.getPropertyList().getList().forEach(p ->
 			properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
 
-		final CatalogFactory factory =
-			TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());
-
-		Catalog catalog = factory.createCatalog(catalogName, properties);
-		return new CreateCatalogOperation(catalogName, catalog);
+		return new CreateCatalogOperation(catalogName, properties);
 	}
 
 	/** Convert DROP CATALOG statement. */
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 4e8e95b..8a46c1e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -23,8 +23,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
 
 import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
 import static org.junit.Assert.assertFalse;
@@ -35,6 +41,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class CatalogITCase {
 
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
 	@Test
 	public void testCreateCatalog() {
 		String name = "c1";
@@ -61,9 +70,54 @@ public class CatalogITCase {
 		assertFalse(tableEnv.getCatalog(name).isPresent());
 	}
 
+	@Test
+	public void testCreateCatalogFromUserClassLoader() throws Exception {
+		final String className = "UserCatalogFactory";
+		URLClassLoader classLoader = ClassLoaderUtils.withRoot(temporaryFolder.newFolder())
+			.addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", "UserCatalogFactory")
+			.addClass(
+				className,
+				"import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" +
+					"import org.apache.flink.table.factories.CatalogFactory;\n" +
+					"import java.util.Collections;\n" +
+					"import org.apache.flink.table.catalog.Catalog;\n" +
+					"import java.util.HashMap;\n" +
+					"import java.util.List;\n" +
+					"import java.util.Map;\n" +
+					"\tpublic class UserCatalogFactory implements CatalogFactory {\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic Catalog createCatalog(\n" +
+					"\t\t\t\tString name,\n" +
+					"\t\t\t\tMap<String, String> properties) {\n" +
+					"\t\t\treturn new GenericInMemoryCatalog(name);\n" +
+					"\t\t}\n" +
+					"\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic Map<String, String> requiredContext() {\n" +
+					"\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n" +
+					"\t\t\thashMap.put(\"type\", \"userCatalog\");\n" +
+					"\t\t\treturn hashMap;\n" +
+					"\t\t}\n" +
+					"\n" +
+					"\t\t@Override\n" +
+					"\t\tpublic List<String> supportedProperties() {\n" +
+					"\t\t\treturn Collections.emptyList();\n" +
+					"\t\t}\n" +
+					"\t}"
+			).build();
+
+		try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
+			TableEnvironment tableEnvironment = getTableEnvironment();
+			tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')");
+
+			assertTrue(tableEnvironment.getCatalog("cat").isPresent());
+		}
+	}
+
 	private TableEnvironment getTableEnvironment() {
 		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		return StreamTableEnvironment.create(env, settings);
 	}
+
 }