You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/23 21:48:34 UTC
[flink] branch master updated: [FLINK-14416][table] Add Module
interface and ModuleManager
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 518f16c [FLINK-14416][table] Add Module interface and ModuleManager
518f16c is described below
commit 518f16c33ee39959670f31ebf662ce1eb241b1a7
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed Oct 16 17:05:42 2019 -0700
[FLINK-14416][table] Add Module interface and ModuleManager
Add Module interface and ModuleManager to Flink SQL.
This closes #9937.
---
flink-python/pyflink/table/table_environment.py | 9 ++
.../table/tests/test_environment_completeness.py | 9 +-
.../client/gateway/local/ExecutionContext.java | 6 +-
.../table/api/java/BatchTableEnvironment.java | 6 +-
.../java/internal/StreamTableEnvironmentImpl.java | 9 +-
.../internal/StreamTableEnvironmentImplTest.java | 5 +-
.../apache/flink/table/api/TableEnvironment.java | 27 +++++
.../table/api/internal/TableEnvironmentImpl.java | 26 ++++-
.../flink/table/catalog/FunctionCatalog.java | 5 +-
.../apache/flink/table/module/ModuleManager.java | 118 +++++++++++++++++++++
.../flink/table/catalog/FunctionCatalogTest.java | 4 +-
.../flink/table/utils/TableEnvironmentMock.java | 12 ++-
.../table/api/scala/BatchTableEnvironment.scala | 7 +-
.../internal/StreamTableEnvironmentImpl.scala | 9 +-
.../internal/StreamTableEnvironmentImplTest.scala | 8 +-
.../java/org/apache/flink/table/module/Module.java | 55 ++++++++++
.../exceptions/ModuleAlreadyExistException.java | 28 +++++
.../module/exceptions/ModuleNotFoundException.java | 28 +++++
.../table/sqlexec/SqlToOperationConverterTest.java | 4 +-
.../metadata/AggCallSelectivityEstimatorTest.scala | 7 +-
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 6 +-
.../plan/metadata/SelectivityEstimatorTest.scala | 7 +-
.../planner/plan/utils/RexNodeExtractorTest.scala | 8 +-
.../flink/table/planner/utils/TableTestBase.scala | 17 ++-
.../table/api/internal/BatchTableEnvImpl.scala | 6 +-
.../flink/table/api/internal/TableEnvImpl.scala | 21 +++-
.../java/internal/BatchTableEnvironmentImpl.scala | 7 +-
.../scala/internal/BatchTableEnvironmentImpl.scala | 7 +-
.../PushFilterIntoTableSourceScanRule.scala | 3 +-
.../table/sqlexec/SqlToOperationConverterTest.java | 4 +-
.../api/stream/StreamTableEnvironmentTest.scala | 8 +-
.../flink/table/api/stream/sql/AggregateTest.scala | 6 +-
.../flink/table/plan/RexProgramExtractorTest.scala | 5 +-
.../flink/table/utils/MockTableEnvironment.scala | 9 +-
.../apache/flink/table/utils/TableTestBase.scala | 13 ++-
35 files changed, 447 insertions(+), 62 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index e4af611..6c88d9e 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -209,6 +209,15 @@ class TableEnvironment(object):
j_catalog_name_array = self._j_tenv.listCatalogs()
return [item for item in j_catalog_name_array]
+ def list_modules(self):
+ """
+ Gets the names of all modules registered in this environment.
+
+ :return: List of module names.
+ """
+ j_module_name_array = self._j_tenv.listModules()
+ return [item for item in j_module_name_array]
+
def list_databases(self):
"""
Gets the names of all databases in the current catalog.
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 8945989..d50bebb 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,7 +41,14 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
# registerCatalog, getCatalog and listTables should be supported when catalog supported in
# python. getCompletionHints has been deprecated. It will be removed in the next release.
# TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
- return {'registerCatalog', 'getCatalog', 'listTables', 'getCompletionHints', 'create'}
+ return {
+ 'registerCatalog',
+ 'getCatalog',
+ 'listTables',
+ 'getCompletionHints',
+ 'create',
+ 'loadModule',
+ 'unloadModule'}
if __name__ == '__main__':
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 73c9cce..023efbc 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -77,6 +77,7 @@ import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
@@ -293,8 +294,8 @@ public class ExecutionContext<T> {
final CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
-
- final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ final ModuleManager moduleManager = new ModuleManager();
+ final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -302,6 +303,7 @@ public class ExecutionContext<T> {
return new StreamTableEnvironmentImpl(
catalogManager,
+ moduleManager,
functionCatalog,
config,
env,
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 7b7692d..38d0063 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.descriptors.BatchTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import java.lang.reflect.Constructor;
@@ -327,13 +328,14 @@ public interface BatchTableEnvironment extends TableEnvironment {
static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
try {
Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
- Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
+ Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, ModuleManager.class);
String defaultCatalog = "default_catalog";
CatalogManager catalogManager = new CatalogManager(
defaultCatalog,
new GenericInMemoryCatalog(defaultCatalog, "default_database")
);
- return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
+ ModuleManager moduleManager = new ModuleManager();
+ return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager);
} catch (Throwable t) {
throw new TableException("Create BatchTableEnvironment failed.", t);
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 06b20f5..4addf4f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
import org.apache.flink.table.sources.TableSource;
@@ -80,13 +81,14 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
public StreamTableEnvironmentImpl(
CatalogManager catalogManager,
+ ModuleManager moduleManager,
FunctionCatalog functionCatalog,
TableConfig tableConfig,
StreamExecutionEnvironment executionEnvironment,
Planner planner,
Executor executor,
boolean isStreamingMode) {
- super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+ super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
this.executionEnvironment = executionEnvironment;
}
@@ -104,7 +106,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
- FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ ModuleManager moduleManager = new ModuleManager();
+
+ FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
@@ -115,6 +119,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
return new StreamTableEnvironmentImpl(
catalogManager,
+ moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
index 1348ec2..6b1724d 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.types.Row;
@@ -92,9 +93,11 @@ public class StreamTableEnvironmentImplTest {
StreamExecutionEnvironment env,
DataStreamSource<Integer> elements) {
CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db"));
+ ModuleManager moduleManager = new ModuleManager();
return new StreamTableEnvironmentImpl(
catalogManager,
- new FunctionCatalog(catalogManager),
+ moduleManager,
+ new FunctionCatalog(catalogManager, moduleManager),
new TableConfig(),
env,
new TestPlanner(elements.getTransformation()),
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 23589b3..344e7ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -27,6 +27,9 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.descriptors.ConnectTableDescriptor;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
@@ -104,6 +107,23 @@ public interface TableEnvironment {
Optional<Catalog> getCatalog(String catalogName);
/**
+ * Loads a {@link Module} under a unique name. Modules will be kept in the loaded order.
+ *
+ * @param moduleName name of the {@link Module}
+ * @param module the module instance
+ * @throws ModuleAlreadyExistException thrown when there is already a module with the same name
+ */
+ void loadModule(String moduleName, Module module) throws ModuleAlreadyExistException;
+
+ /**
+ * Unloads a {@link Module} with given name.
+ *
+ * @param moduleName name of the {@link Module}
+ * @throws ModuleNotFoundException thrown when there is no module with the given name
+ */
+ void unloadModule(String moduleName) throws ModuleNotFoundException;
+
+ /**
* Registers a {@link ScalarFunction} under a unique name. Replaces already existing
* user-defined functions under this name.
*/
@@ -238,6 +258,13 @@ public interface TableEnvironment {
String[] listCatalogs();
/**
+ * Gets an array of names of all modules in this environment in the loaded order.
+ *
+ * @return A list of the names of all modules in the loaded order.
+ */
+ String[] listModules();
+
+ /**
* Gets the names of all databases registered in the current catalog.
*
* @return A list of the names of all registered databases in the current catalog.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 12e874e..d13dea3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -48,6 +48,10 @@ import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
@@ -80,6 +84,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
// and this should always be true. This avoids too many hard code.
private static final boolean IS_STREAM_TABLE = true;
private final CatalogManager catalogManager;
+ private final ModuleManager moduleManager;
private final OperationTreeBuilder operationTreeBuilder;
private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
@@ -90,12 +95,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
protected TableEnvironmentImpl(
CatalogManager catalogManager,
+ ModuleManager moduleManager,
TableConfig tableConfig,
Executor executor,
FunctionCatalog functionCatalog,
Planner planner,
boolean isStreamingMode) {
this.catalogManager = catalogManager;
+ this.moduleManager = moduleManager;
this.execEnv = executor;
this.tableConfig = tableConfig;
@@ -118,7 +125,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
- FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ ModuleManager moduleManager = new ModuleManager();
+ FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
@@ -131,6 +139,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
return new TableEnvironmentImpl(
catalogManager,
+ moduleManager,
tableConfig,
executor,
functionCatalog,
@@ -162,6 +171,16 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
+ public void loadModule(String moduleName, Module module) throws ModuleAlreadyExistException {
+ moduleManager.loadModule(moduleName, module);
+ }
+
+ @Override
+ public void unloadModule(String moduleName) throws ModuleNotFoundException {
+ moduleManager.unloadModule(moduleName);
+ }
+
+ @Override
public void registerFunction(String name, ScalarFunction function) {
functionCatalog.registerTempSystemScalarFunction(
name,
@@ -230,6 +249,11 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
+ public String[] listModules() {
+ return moduleManager.listModules().toArray(new String[0]);
+ }
+
+ @Override
public String[] listDatabases() {
return catalogManager.getCatalog(catalogManager.getCurrentCatalog())
.get()
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 6808216..61f1154 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedAggregateFunction;
import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.util.Preconditions;
import java.util.HashSet;
@@ -57,6 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class FunctionCatalog implements FunctionLookup {
private final CatalogManager catalogManager;
+ private final ModuleManager moduleManager;
private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
@@ -66,8 +68,9 @@ public class FunctionCatalog implements FunctionLookup {
*/
private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
- public FunctionCatalog(CatalogManager catalogManager) {
+ public FunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) {
this.catalogManager = checkNotNull(catalogManager);
+ this.moduleManager = checkNotNull(moduleManager);
}
public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
new file mode 100644
index 0000000..9216237
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responsible for loading/unloading modules, managing their life cycles, and resolving module objects.
+ */
+public class ModuleManager {
+ private LinkedHashMap<String, Module> modules;
+
+ public ModuleManager() {
+ this.modules = new LinkedHashMap<>();
+
+ // TODO: Add Core module to modules
+ }
+
+ /**
+ * Load a module under a unique name. Modules will be kept in the loaded order, and new module
+ * will be added to the end.
+ *
+ * @param name name of the module
+ * @param module the module instance
+ * @throws ModuleAlreadyExistException thrown when there is already a module with the same name
+ */
+ public void loadModule(String name, Module module) throws ModuleAlreadyExistException {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
+ checkNotNull(module, "module cannot be null");
+
+ if (!modules.containsKey(name)) {
+ modules.put(name, module);
+ } else {
+ throw new ModuleAlreadyExistException(name);
+ }
+ }
+
+ /**
+ * Unload a module with given name.
+ *
+ * @param name name of the module
+ * @throws ModuleNotFoundException thrown when there is no module with the given name
+ */
+ public void unloadModule(String name) throws ModuleNotFoundException {
+ if (modules.containsKey(name)) {
+ modules.remove(name);
+ } else {
+ throw new ModuleNotFoundException(name);
+ }
+ }
+
+ /**
+ * Get names of all modules loaded.
+ *
+ * @return a list of names of modules loaded
+ */
+ public List<String> listModules() {
+ return new ArrayList<>(modules.keySet());
+ }
+
+ /**
+ * Get names of all functions from all modules.
+ *
+ * @return a set of names of registered modules.
+ */
+ public Set<String> listFunctions() {
+ return modules.values().stream()
+ .map(m -> m.listFunctions())
+ .flatMap(n -> n.stream())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Get an optional of {@link FunctionDefinition} by a given name.
+ * Function will be resolved to modules in the loaded order, and the first match will be returned.
+ * If no match is found in all modules, return an optional.
+ *
+ * @param name name of the function
+ * @return an optional of {@link FunctionDefinition}
+ */
+ public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+ Optional<Module> module = modules.values().stream()
+ .filter(p -> p.listFunctions().stream().anyMatch(e -> e.equals(name)))
+ .findFirst();
+
+ return module.isPresent() ? module.get().getFunctionDefinition(name) : Optional.empty();
+ }
+
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index fb11cbc..5f79dd5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.module.ModuleManager;
import org.junit.Test;
@@ -37,7 +38,8 @@ public class FunctionCatalogTest {
@Test
public void testGetBuiltInFunctions() {
FunctionCatalog functionCatalog = new FunctionCatalog(
- new CatalogManager("test", new GenericInMemoryCatalog("test")));
+ new CatalogManager("test", new GenericInMemoryCatalog("test")),
+ new ModuleManager());
Set<String> actual = new HashSet<>();
Collections.addAll(actual, functionCatalog.getFunctions());
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dc8b992..fa2c58c 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.module.ModuleManager;
/**
* Mocking {@link TableEnvironment} for tests.
@@ -41,12 +42,13 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
protected TableEnvironmentMock(
CatalogManager catalogManager,
+ ModuleManager moduleManager,
TableConfig tableConfig,
ExecutorMock executor,
FunctionCatalog functionCatalog,
PlannerMock planner,
boolean isStreamingMode) {
- super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+ super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
this.catalogManager = catalogManager;
this.executor = executor;
@@ -64,11 +66,13 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
final CatalogManager catalogManager = createCatalogManager();
+ final ModuleManager moduleManager = new ModuleManager();
return new TableEnvironmentMock(
catalogManager,
+ moduleManager,
createTableConfig(),
createExecutor(),
- createFunctionCatalog(catalogManager),
+ createFunctionCatalog(catalogManager, moduleManager),
createPlanner(),
isStreamingMode);
}
@@ -89,8 +93,8 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
return new ExecutorMock();
}
- private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager) {
- return new FunctionCatalog(catalogManager);
+ private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) {
+ return new FunctionCatalog(catalogManager, moduleManager);
}
private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 516e6b1..c57b035 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.sinks.TableSink
/**
@@ -294,7 +295,8 @@ object BatchTableEnvironment {
.getConstructor(
classOf[ExecutionEnvironment],
classOf[TableConfig],
- classOf[CatalogManager])
+ classOf[CatalogManager],
+ classOf[ModuleManager])
val builtInCatalog = "default_catalog"
val catalogManager = new CatalogManager(
"default_catalog",
@@ -302,7 +304,8 @@ object BatchTableEnvironment {
builtInCatalog,
"default_database")
)
- const.newInstance(executionEnvironment, tableConfig, catalogManager)
+ val moduleManager = new ModuleManager
+ const.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager)
.asInstanceOf[BatchTableEnvironment]
} catch {
case t: Throwable => throw new TableException("Create BatchTableEnvironment failed.", t)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 1384be0..97da798 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -35,14 +35,15 @@ import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescr
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.factories.ComponentFactoryService
import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations.{OutputConversionModifyOperation, ScalaDataStreamQueryOperation}
import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.typeutils.FieldInfoUtils
-
import java.util
import java.util.{Collections, List => JList, Map => JMap}
+
import _root_.scala.collection.JavaConverters._
/**
@@ -52,6 +53,7 @@ import _root_.scala.collection.JavaConverters._
@Internal
class StreamTableEnvironmentImpl (
catalogManager: CatalogManager,
+ moduleManager: ModuleManager,
functionCatalog: FunctionCatalog,
config: TableConfig,
scalaExecutionEnvironment: StreamExecutionEnvironment,
@@ -60,6 +62,7 @@ class StreamTableEnvironmentImpl (
isStreaming: Boolean)
extends TableEnvironmentImpl(
catalogManager,
+ moduleManager,
config,
executor,
functionCatalog,
@@ -278,7 +281,8 @@ object StreamTableEnvironmentImpl {
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
- val functionCatalog = new FunctionCatalog(catalogManager)
+ val moduleManager = new ModuleManager
+ val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
val executorProperties = settings.toExecutorProperties
val executor = lookupExecutor(executorProperties, executionEnvironment)
@@ -294,6 +298,7 @@ object StreamTableEnvironmentImpl {
new StreamTableEnvironmentImpl(
catalogManager,
+ moduleManager,
functionCatalog,
tableConfig,
executionEnvironment,
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
index 91b37d0..cf36ed2 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -26,13 +26,13 @@ import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericI
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.operations.{ModifyOperation, Operation}
import org.apache.flink.types.Row
-
import org.hamcrest.CoreMatchers.equalTo
import org.junit.Assert.assertThat
import org.junit.Test
-
import java.util.{Collections, List => JList}
+import org.apache.flink.table.module.ModuleManager
+
/**
* Tests for [[StreamTableEnvironmentImpl]].
*/
@@ -83,9 +83,11 @@ class StreamTableEnvironmentImplTest {
val catalogManager = new CatalogManager(
"cat",
new GenericInMemoryCatalog("cat", "db"))
+ val moduleManager = new ModuleManager
new StreamTableEnvironmentImpl(
catalogManager,
- new FunctionCatalog(catalogManager),
+ moduleManager,
+ new FunctionCatalog(catalogManager, moduleManager),
new TableConfig,
env,
new TestPlanner(elements.javaStream.getTransformation),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
new file mode 100644
index 0000000..eccbe70
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.FunctionDefinition;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Modules define a set of metadata, including functions, user defined types, operators, rules, etc.
+ * Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
+ */
+@PublicEvolving
+public interface Module {
+
+ /**
+ * List names of all functions in this module.
+ *
+ * @return a set of function names
+ */
+ default Set<String> listFunctions() {
+ return Collections.emptySet();
+ }
+
+ /**
+ * Get an optional of {@link FunctionDefinition} by a give name.
+ *
+ * @param name name of the {@link FunctionDefinition}.
+ * @return an optional function definition
+ */
+ default Optional<FunctionDefinition> getFunctionDefinition(String name) {
+ return Optional.empty();
+ }
+
+ // user defined types, operators, rules, etc
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java
new file mode 100644
index 0000000..1f27ba6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module.exceptions;
+
+/**
+ * Exception for trying to load a module that already exists.
+ */
+public class ModuleAlreadyExistException extends Exception {
+ public ModuleAlreadyExistException(String moduleName) {
+ super(String.format("Module %s already exists.", moduleName));
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java
new file mode 100644
index 0000000..bf575cd
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module.exceptions;
+
+/**
+ * Exception for operating on a module that does not exists.
+ */
+public class ModuleNotFoundException extends Exception {
+ public ModuleNotFoundException(String moduleName) {
+ super(String.format("Module %s does not exist.", moduleName));
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 1acdd88..9f87275 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -71,7 +72,8 @@ public class SqlToOperationConverterTest {
"default");
private final CatalogManager catalogManager =
new CatalogManager("builtin", catalog);
- private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ private final ModuleManager moduleManager = new ModuleManager();
+ private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
private final PlannerContext plannerContext =
new PlannerContext(tableConfig,
functionCatalog,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index 92905e7..554d51b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.{JDouble, JLong}
import org.apache.flink.util.Preconditions
-
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
@@ -46,9 +45,10 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
-
import java.math.BigDecimal
+import org.apache.flink.table.module.ModuleManager
+
import scala.collection.JavaConversions._
/**
@@ -83,7 +83,8 @@ class AggCallSelectivityEstimatorTest {
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
val catalogManager = mock(classOf[CatalogManager])
- val functionCatalog = new FunctionCatalog(catalogManager)
+ val moduleManager = mock(classOf[ModuleManager])
+ val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
val context = new FlinkContextImpl(new TableConfig, functionCatalog)
when(tableScan, "getCluster").thenReturn(cluster)
when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index dd6d64d..3802d44 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -67,10 +67,11 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
import org.junit.{Before, BeforeClass}
-
import java.math.BigDecimal
import java.util
+import org.apache.flink.table.module.ModuleManager
+
import scala.collection.JavaConversions._
class FlinkRelMdHandlerTestBase {
@@ -82,13 +83,14 @@ class FlinkRelMdHandlerTestBase {
val builtinDatabase = "default_database"
val catalogManager = new CatalogManager(
builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
+ val moduleManager = new ModuleManager
// TODO batch RelNode and stream RelNode should have different PlannerContext
// and RelOptCluster due to they have different trait definitions.
val plannerContext: PlannerContext =
new PlannerContext(
tableConfig,
- new FunctionCatalog(catalogManager),
+ new FunctionCatalog(catalogManager, moduleManager),
CalciteSchema.from(rootSchema),
util.Arrays.asList(
ConventionTraitDef.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
index 99de5a7..f16ae27 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.schema._
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.{JDouble, JLong}
import org.apache.flink.util.Preconditions
-
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
@@ -43,9 +42,10 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
-
import java.math.BigDecimal
+import org.apache.flink.table.module.ModuleManager
+
import scala.collection.JavaConverters._
/**
@@ -85,7 +85,8 @@ class SelectivityEstimatorTest {
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
val catalogManager = mock(classOf[CatalogManager])
- val functionCatalog = new FunctionCatalog(catalogManager)
+ val moduleManager = mock(classOf[ModuleManager])
+ val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog)
when(tableScan, "getCluster").thenReturn(cluster)
when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index f0289e4..efe2f6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
import org.apache.flink.table.planner.plan.utils.InputTypeBuilder.inputOf
import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction}
-
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
@@ -42,11 +41,12 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue}
import org.junit.Test
-
import java.math.BigDecimal
import java.sql.Timestamp
import java.util.{TimeZone, List => JList}
+import org.apache.flink.table.module.ModuleManager
+
import scala.collection.JavaConverters._
/**
@@ -56,8 +56,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val defaultCatalog = "default_catalog"
val catalogManager = new CatalogManager(
defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
-
- private val functionCatalog = new FunctionCatalog(catalogManager)
+ val moduleManager = new ModuleManager
+ private val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
private val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index a452717..4ded0fc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -57,16 +57,16 @@ import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.types.Row
-
import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql.SqlExplainLevel
import org.apache.commons.lang3.SystemUtils
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Rule
import org.junit.rules.{ExpectedException, TestName}
-
import _root_.java.util
+import org.apache.flink.table.module.ModuleManager
+
import _root_.scala.collection.JavaConversions._
import _root_.scala.io.Source
@@ -887,6 +887,7 @@ class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
class TestingTableEnvironment private(
catalogManager: CatalogManager,
+ moduleManager: ModuleManager,
tableConfig: TableConfig,
executor: Executor,
functionCatalog: FunctionCatalog,
@@ -894,6 +895,7 @@ class TestingTableEnvironment private(
isStreamingMode: Boolean)
extends TableEnvironmentImpl(
catalogManager,
+ moduleManager,
tableConfig,
executor,
functionCatalog,
@@ -1027,7 +1029,8 @@ object TestingTableEnvironment {
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
}
- val functionCatalog = new FunctionCatalog(catalogMgr)
+ val moduleManager = new ModuleManager
+ val functionCatalog = new FunctionCatalog(catalogMgr, moduleManager)
val plannerProperties = settings.toPlannerProperties
val executorProperties = settings.toExecutorProperties
val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
@@ -1036,7 +1039,13 @@ object TestingTableEnvironment {
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
.asInstanceOf[PlannerBase]
new TestingTableEnvironment(
- catalogMgr, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode)
+ catalogMgr,
+ moduleManager,
+ tableConfig,
+ executor,
+ functionCatalog,
+ planner,
+ settings.isStreamingMode)
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index a4ed523..5ec2bba 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.operations.DataSetQueryOperation
import org.apache.flink.table.plan.BatchOptimizer
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -59,8 +60,9 @@ import _root_.scala.collection.JavaConversions._
abstract class BatchTableEnvImpl(
private[flink] val execEnv: ExecutionEnvironment,
config: TableConfig,
- catalogManager: CatalogManager)
- extends TableEnvImpl(config, catalogManager) {
+ catalogManager: CatalogManager,
+ moduleManager: ModuleManager)
+ extends TableEnvImpl(config, catalogManager, moduleManager) {
private[flink] val optimizer = new BatchOptimizer(
() => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index a00ea35..e028032 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
+import org.apache.flink.table.module.{Module, ModuleManager}
import org.apache.flink.table.operations.ddl.CreateTableOperation
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
@@ -37,12 +38,10 @@ import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSi
import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.util.JavaScalaConversionUtil
-
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.sql._
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.tools.FrameworkConfig
-
import _root_.java.util.{Optional, HashMap => JHashMap, Map => JMap}
import _root_.scala.collection.JavaConversions._
@@ -55,11 +54,13 @@ import _root_.scala.collection.JavaConverters._
*/
abstract class TableEnvImpl(
val config: TableConfig,
- private val catalogManager: CatalogManager)
+ private val catalogManager: CatalogManager,
+ private val moduleManager: ModuleManager)
extends TableEnvironment {
// Table API/SQL function catalog
- private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
+ private[flink] val functionCatalog: FunctionCatalog =
+ new FunctionCatalog(catalogManager, moduleManager)
// temporary utility until we don't use planner expressions anymore
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
@@ -156,6 +157,14 @@ abstract class TableEnvImpl(
catalogManager.getCatalog(catalogName)
}
+ override def loadModule(moduleName: String, module: Module): Unit = {
+ moduleManager.loadModule(moduleName, module)
+ }
+
+ override def unloadModule(moduleName: String): Unit = {
+ moduleManager.unloadModule(moduleName)
+ }
+
override def getCurrentCatalog: String = {
catalogManager.getCurrentCatalog
}
@@ -329,6 +338,10 @@ abstract class TableEnvImpl(
.map(t => new CatalogQueryOperation(objectIdentifier, t.getSchema))
}
+ override def listModules(): Array[String] = {
+ moduleManager.listModules().asScala.toArray
+ }
+
override def listCatalogs(): Array[String] = {
catalogManager.getCatalogs.asScala.toArray
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
index 7358721..971ba37 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.ExpressionParser
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
import _root_.scala.collection.JavaConverters._
@@ -39,11 +40,13 @@ import _root_.scala.collection.JavaConverters._
class BatchTableEnvironmentImpl(
execEnv: ExecutionEnvironment,
config: TableConfig,
- catalogManager: CatalogManager)
+ catalogManager: CatalogManager,
+ moduleManager: ModuleManager)
extends BatchTableEnvImpl(
execEnv,
config,
- catalogManager)
+ catalogManager,
+ moduleManager)
with org.apache.flink.table.api.java.BatchTableEnvironment {
override def fromDataSet[T](dataSet: DataSet[T]): Table = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
index 8e13426..7324a4b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
import _root_.scala.reflect.ClassTag
@@ -38,11 +39,13 @@ import _root_.scala.reflect.ClassTag
class BatchTableEnvironmentImpl(
execEnv: ExecutionEnvironment,
config: TableConfig,
- catalogManager: CatalogManager)
+ catalogManager: CatalogManager,
+ moduleManager: ModuleManager)
extends BatchTableEnvImpl(
execEnv.getJavaEnv,
config,
- catalogManager)
+ catalogManager,
+ moduleManager)
with org.apache.flink.table.api.scala.BatchTableEnvironment {
override def fromDataSet[T](dataSet: DataSet[T]): Table = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index de1d0e0..334fcb3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexProgram
import org.apache.flink.table.api.TableException
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.expressions.{Expression, PlannerExpression}
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
import org.apache.flink.table.plan.util.RexProgramExtractor
import org.apache.flink.table.sources.FilterableTableSource
@@ -73,7 +74,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
RexProgramExtractor.extractConjunctiveConditions(
program,
call.builder().getRexBuilder,
- new FunctionCatalog(catalogManager))
+ new FunctionCatalog(catalogManager, new ModuleManager))
if (predicates.isEmpty) {
// no condition can be translated to expression
return
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 0424109..79f5470 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.expressions.ExpressionBridge;
import org.apache.flink.table.expressions.PlannerExpressionConverter;
+import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -74,7 +75,8 @@ public class SqlToOperationConverterTest {
"default");
private final CatalogManager catalogManager =
new CatalogManager("builtin", catalog);
- private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+ private final ModuleManager moduleManager = new ModuleManager();
+ private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
private final PlanningConfigurationBuilder planningConfigurationBuilder =
new PlanningConfigurationBuilder(tableConfig,
functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 11e43cf..a3450f4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -36,12 +36,12 @@ import org.apache.flink.table.runtime.utils.StreamTestData
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
import org.apache.flink.types.Row
-
import org.junit.Test
import org.mockito.Mockito.{mock, when}
-
import java.lang.{Integer => JInt, Long => JLong}
+import org.apache.flink.table.module.ModuleManager
+
class StreamTableEnvironmentTest extends TableTestBase {
@Test
@@ -206,11 +206,13 @@ class StreamTableEnvironmentTest extends TableTestBase {
val manager: CatalogManager = new CatalogManager(
"default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"))
+ val moduleManager: ModuleManager = new ModuleManager
val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
- val functionCatalog = new FunctionCatalog(manager)
+ val functionCatalog = new FunctionCatalog(manager, moduleManager)
val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
val jTEnv = new JStreamTableEnvironmentImpl(
manager,
+ moduleManager,
functionCatalog,
config,
jStreamExecEnv,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index c917267..1bd89f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.api.{TableConfig, Types}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition}
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
import org.apache.flink.types.Row
@@ -69,10 +70,11 @@ class AggregateTest extends TableTestBase {
val defaultCatalog = "default_catalog"
val catalogManager = new CatalogManager(
defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
-
- val functionCatalog = new FunctionCatalog(catalogManager)
+ val moduleManager = new ModuleManager
+ val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
val tablEnv = new StreamTableEnvironmentImpl(
catalogManager,
+ moduleManager,
functionCatalog,
new TableConfig,
Mockito.mock(classOf[StreamExecutionEnvironment]),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index b752b76..d109726 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.expressions._
+import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
import org.apache.flink.table.utils.InputTypeBuilder.inputOf
import org.hamcrest.CoreMatchers.is
@@ -42,7 +43,9 @@ import scala.collection.mutable
class RexProgramExtractorTest extends RexProgramTestBase {
private val functionCatalog: FunctionCatalog = new FunctionCatalog(
- new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")))
+ new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")),
+ new ModuleManager
+ )
private val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](
functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 6104724..b0f11f3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -26,9 +26,10 @@ import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDesc
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
-
import java.util.Optional
+import org.apache.flink.table.module.Module
+
class MockTableEnvironment extends TableEnvironment {
override def fromTableSource(source: TableSource[_]): Table = ???
@@ -52,6 +53,8 @@ class MockTableEnvironment extends TableEnvironment {
override def listCatalogs(): Array[String] = ???
+ override def listModules(): Array[String] = ???
+
override def listDatabases(): Array[String] = ???
override def listTables(): Array[String] = ???
@@ -94,4 +97,8 @@ class MockTableEnvironment extends TableEnvironment {
sinkPathContinued: String*): Unit = ???
override def execute(jobName: String): JobExecutionResult = ???
+
+ override def loadModule(moduleName: String, module: Module): Unit = ???
+
+ override def unloadModule(moduleName: String): Unit = ???
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 6e0c3a3..5ebc79e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -37,9 +37,9 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
import org.apache.flink.table.planner.StreamPlanner
import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
-
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.module.ModuleManager
import org.junit.Assert.assertEquals
import org.junit.rules.ExpectedException
import org.junit.{ComparisonFailure, Rule}
@@ -231,12 +231,14 @@ case class BatchTableTestUtil(
val javaTableEnv = new JavaBatchTableEnvironmentImpl(
javaEnv,
new TableConfig,
- catalogManager.getOrElse(createCatalogManager()))
+ catalogManager.getOrElse(createCatalogManager()),
+ new ModuleManager)
val env = new ExecutionEnvironment(javaEnv)
val tableEnv = new ScalaBatchTableEnvironmentImpl(
env,
new TableConfig,
- catalogManager.getOrElse(createCatalogManager()))
+ catalogManager.getOrElse(createCatalogManager()),
+ new ModuleManager)
def addTable[T: TypeInformation](
name: String,
@@ -328,12 +330,14 @@ case class StreamTableTestUtil(
private val tableConfig = new TableConfig
private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
+ private val moduleManager: ModuleManager = new ModuleManager
private val executor: StreamExecutor = new StreamExecutor(javaEnv)
- private val functionCatalog = new FunctionCatalog(manager)
+ private val functionCatalog = new FunctionCatalog(manager, moduleManager)
private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
val javaTableEnv = new JavaStreamTableEnvironmentImpl(
manager,
+ moduleManager,
functionCatalog,
tableConfig,
javaEnv,
@@ -344,6 +348,7 @@ case class StreamTableTestUtil(
val env = new StreamExecutionEnvironment(javaEnv)
val tableEnv = new ScalaStreamTableEnvironmentImpl(
manager,
+ moduleManager,
functionCatalog,
tableConfig,
env,