You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/07/10 07:30:14 UTC
[flink] 04/04: [FLINK-18419] Create catalog in TableEnvironment
using user ClassLoader
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1aa92e897e7d71a1063b87ee08496edac6e4b633
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Jul 3 16:31:23 2020 +0200
[FLINK-18419] Create catalog in TableEnvironment using user ClassLoader
---
.../table/api/internal/TableEnvironmentImpl.java | 30 ++++++++----
.../operations/ddl/CreateCatalogOperation.java | 12 ++---
flink-table/flink-table-planner-blink/pom.xml | 8 ++++
.../operations/SqlToOperationConverter.java | 8 +---
.../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++++++++++++++
5 files changed, 90 insertions(+), 22 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 99306a0..d4f43db 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -72,7 +72,9 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
@@ -1014,15 +1016,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
} else if (operation instanceof AlterCatalogFunctionOperation) {
return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
} else if (operation instanceof CreateCatalogOperation) {
- CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
- String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
- try {
- catalogManager.registerCatalog(
- createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
- return TableResultImpl.TABLE_RESULT_OK;
- } catch (CatalogException e) {
- throw new ValidationException(exMsg, e);
- }
+ return createCatalog((CreateCatalogOperation) operation);
} else if (operation instanceof DropCatalogOperation) {
DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
@@ -1078,6 +1072,24 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
}
}
+ private TableResult createCatalog(CreateCatalogOperation operation) {
+ String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
+ try {
+ String catalogName = operation.getCatalogName();
+ Map<String, String> properties = operation.getProperties();
+ final CatalogFactory factory = TableFactoryService.find(
+ CatalogFactory.class,
+ properties,
+ userClassLoader);
+
+ Catalog catalog = factory.createCatalog(catalogName, properties);
+ catalogManager.registerCatalog(catalogName, catalog);
+ return TableResultImpl.TABLE_RESULT_OK;
+ } catch (CatalogException e) {
+ throw new ValidationException(exMsg, e);
+ }
+ }
+
private TableResult buildShowResult(String columnName, String[] objects) {
return buildResult(
new String[]{columnName},
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
index 1d04a20..c81ee1d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.operations.ddl;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
@@ -33,25 +32,26 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class CreateCatalogOperation implements CreateOperation {
private final String catalogName;
- private final Catalog catalog;
+ private final Map<String, String> properties;
- public CreateCatalogOperation(String catalogName, Catalog catalog) {
+ public CreateCatalogOperation(String catalogName, Map<String, String> properties) {
this.catalogName = checkNotNull(catalogName);
- this.catalog = checkNotNull(catalog);
+ this.properties = checkNotNull(properties);
}
public String getCatalogName() {
return catalogName;
}
- public Catalog getCatalog() {
- return catalog;
+ public Map<String, String> getProperties() {
+ return Collections.unmodifiableMap(properties);
}
@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogName", catalogName);
+ params.put("properties", properties);
return OperationUtils.formatWithChildren(
"CREATE CATALOG",
diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml
index a7cfb3a..085eba5 100644
--- a/flink-table/flink-table-planner-blink/pom.xml
+++ b/flink-table/flink-table-planner-blink/pom.xml
@@ -223,6 +223,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- SuccessException used in TestValuesTableFactory -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 386cb9a..be54497 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -74,8 +74,6 @@ import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExplainOperation;
@@ -553,11 +551,7 @@ public class SqlToOperationConverter {
sqlCreateCatalog.getPropertyList().getList().forEach(p ->
properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
- final CatalogFactory factory =
- TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());
-
- Catalog catalog = factory.createCatalog(catalogName, properties);
- return new CreateCatalogOperation(catalogName, catalog);
+ return new CreateCatalogOperation(catalogName, properties);
}
/** Convert DROP CATALOG statement. */
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 4e8e95b..8a46c1e 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -23,8 +23,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.testutils.ClassLoaderUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.net.URLClassLoader;
import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
import static org.junit.Assert.assertFalse;
@@ -35,6 +41,9 @@ import static org.junit.Assert.assertTrue;
*/
public class CatalogITCase {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Test
public void testCreateCatalog() {
String name = "c1";
@@ -61,9 +70,54 @@ public class CatalogITCase {
assertFalse(tableEnv.getCatalog(name).isPresent());
}
+ @Test
+ public void testCreateCatalogFromUserClassLoader() throws Exception {
+ final String className = "UserCatalogFactory";
+ URLClassLoader classLoader = ClassLoaderUtils.withRoot(temporaryFolder.newFolder())
+ .addResource("META-INF/services/org.apache.flink.table.factories.TableFactory", "UserCatalogFactory")
+ .addClass(
+ className,
+ "import org.apache.flink.table.catalog.GenericInMemoryCatalog;\n" +
+ "import org.apache.flink.table.factories.CatalogFactory;\n" +
+ "import java.util.Collections;\n" +
+ "import org.apache.flink.table.catalog.Catalog;\n" +
+ "import java.util.HashMap;\n" +
+ "import java.util.List;\n" +
+ "import java.util.Map;\n" +
+ "\tpublic class UserCatalogFactory implements CatalogFactory {\n" +
+ "\t\t@Override\n" +
+ "\t\tpublic Catalog createCatalog(\n" +
+ "\t\t\t\tString name,\n" +
+ "\t\t\t\tMap<String, String> properties) {\n" +
+ "\t\t\treturn new GenericInMemoryCatalog(name);\n" +
+ "\t\t}\n" +
+ "\n" +
+ "\t\t@Override\n" +
+ "\t\tpublic Map<String, String> requiredContext() {\n" +
+ "\t\t\tHashMap<String, String> hashMap = new HashMap<>();\n" +
+ "\t\t\thashMap.put(\"type\", \"userCatalog\");\n" +
+ "\t\t\treturn hashMap;\n" +
+ "\t\t}\n" +
+ "\n" +
+ "\t\t@Override\n" +
+ "\t\tpublic List<String> supportedProperties() {\n" +
+ "\t\t\treturn Collections.emptyList();\n" +
+ "\t\t}\n" +
+ "\t}"
+ ).build();
+
+ try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
+ TableEnvironment tableEnvironment = getTableEnvironment();
+ tableEnvironment.executeSql("CREATE CATALOG cat WITH ('type'='userCatalog')");
+
+ assertTrue(tableEnvironment.getCatalog("cat").isPresent());
+ }
+ }
+
private TableEnvironment getTableEnvironment() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
return StreamTableEnvironment.create(env, settings);
}
+
}