You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/23 21:48:34 UTC

[flink] branch master updated: [FLINK-14416][table] Add Module interface and ModuleManager

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 518f16c  [FLINK-14416][table] Add Module interface and ModuleManager
518f16c is described below

commit 518f16c33ee39959670f31ebf662ce1eb241b1a7
Author: bowen.li <bo...@gmail.com>
AuthorDate: Wed Oct 16 17:05:42 2019 -0700

    [FLINK-14416][table] Add Module interface and ModuleManager
    
    Add Module interface and ModuleManager to Flink SQL.
    
    This closes #9937.
---
 flink-python/pyflink/table/table_environment.py    |   9 ++
 .../table/tests/test_environment_completeness.py   |   9 +-
 .../client/gateway/local/ExecutionContext.java     |   6 +-
 .../table/api/java/BatchTableEnvironment.java      |   6 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |   9 +-
 .../internal/StreamTableEnvironmentImplTest.java   |   5 +-
 .../apache/flink/table/api/TableEnvironment.java   |  27 +++++
 .../table/api/internal/TableEnvironmentImpl.java   |  26 ++++-
 .../flink/table/catalog/FunctionCatalog.java       |   5 +-
 .../apache/flink/table/module/ModuleManager.java   | 118 +++++++++++++++++++++
 .../flink/table/catalog/FunctionCatalogTest.java   |   4 +-
 .../flink/table/utils/TableEnvironmentMock.java    |  12 ++-
 .../table/api/scala/BatchTableEnvironment.scala    |   7 +-
 .../internal/StreamTableEnvironmentImpl.scala      |   9 +-
 .../internal/StreamTableEnvironmentImplTest.scala  |   8 +-
 .../java/org/apache/flink/table/module/Module.java |  55 ++++++++++
 .../exceptions/ModuleAlreadyExistException.java    |  28 +++++
 .../module/exceptions/ModuleNotFoundException.java |  28 +++++
 .../table/sqlexec/SqlToOperationConverterTest.java |   4 +-
 .../metadata/AggCallSelectivityEstimatorTest.scala |   7 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   6 +-
 .../plan/metadata/SelectivityEstimatorTest.scala   |   7 +-
 .../planner/plan/utils/RexNodeExtractorTest.scala  |   8 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  17 ++-
 .../table/api/internal/BatchTableEnvImpl.scala     |   6 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  21 +++-
 .../java/internal/BatchTableEnvironmentImpl.scala  |   7 +-
 .../scala/internal/BatchTableEnvironmentImpl.scala |   7 +-
 .../PushFilterIntoTableSourceScanRule.scala        |   3 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |   8 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |   6 +-
 .../flink/table/plan/RexProgramExtractorTest.scala |   5 +-
 .../flink/table/utils/MockTableEnvironment.scala   |   9 +-
 .../apache/flink/table/utils/TableTestBase.scala   |  13 ++-
 35 files changed, 447 insertions(+), 62 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index e4af611..6c88d9e 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -209,6 +209,15 @@ class TableEnvironment(object):
         j_catalog_name_array = self._j_tenv.listCatalogs()
         return [item for item in j_catalog_name_array]
 
+    def list_modules(self):
+        """
+        Gets the names of all modules registered in this environment.
+
+        :return: List of module names.
+        """
+        j_module_name_array = self._j_tenv.listModules()
+        return [item for item in j_module_name_array]
+
     def list_databases(self):
         """
         Gets the names of all databases in the current catalog.
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 8945989..d50bebb 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -41,7 +41,14 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, unittest.Te
         # registerCatalog, getCatalog and listTables should be supported when catalog supported in
         # python. getCompletionHints has been deprecated. It will be removed in the next release.
         # TODO add TableEnvironment#create method with EnvironmentSettings as a parameter
-        return {'registerCatalog', 'getCatalog', 'listTables', 'getCompletionHints', 'create'}
+        return {
+            'registerCatalog',
+            'getCatalog',
+            'listTables',
+            'getCompletionHints',
+            'create',
+            'loadModule',
+            'unloadModule'}
 
 
 if __name__ == '__main__':
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 73c9cce..023efbc 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -77,6 +77,7 @@ import org.apache.flink.table.functions.FunctionService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.planner.delegation.ExecutorBase;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
@@ -293,8 +294,8 @@ public class ExecutionContext<T> {
 		final CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
-
-		final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+		final ModuleManager moduleManager = new ModuleManager();
+		final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
 
 		final Map<String, String> plannerProperties = settings.toPlannerProperties();
 		final Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
@@ -302,6 +303,7 @@ public class ExecutionContext<T> {
 
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
+			moduleManager,
 			functionCatalog,
 			config,
 			env,
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 7b7692d..38d0063 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.descriptors.BatchTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.sinks.TableSink;
 
 import java.lang.reflect.Constructor;
@@ -327,13 +328,14 @@ public interface BatchTableEnvironment extends TableEnvironment {
 	static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
 		try {
 			Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
-			Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
+			Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, ModuleManager.class);
 			String defaultCatalog = "default_catalog";
 			CatalogManager catalogManager = new CatalogManager(
 				defaultCatalog,
 				new GenericInMemoryCatalog(defaultCatalog, "default_database")
 			);
-			return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
+			ModuleManager moduleManager = new ModuleManager();
+			return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager);
 		} catch (Throwable t) {
 			throw new TableException("Create BatchTableEnvironment failed.", t);
 		}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 06b20f5..4addf4f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
 import org.apache.flink.table.sources.TableSource;
@@ -80,13 +81,14 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 
 	public StreamTableEnvironmentImpl(
 			CatalogManager catalogManager,
+			ModuleManager moduleManager,
 			FunctionCatalog functionCatalog,
 			TableConfig tableConfig,
 			StreamExecutionEnvironment executionEnvironment,
 			Planner planner,
 			Executor executor,
 			boolean isStreamingMode) {
-		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
 		this.executionEnvironment = executionEnvironment;
 	}
 
@@ -104,7 +106,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
 
-		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+		ModuleManager moduleManager = new ModuleManager();
+
+		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
 
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
@@ -115,6 +119,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
+			moduleManager,
 			functionCatalog,
 			tableConfig,
 			executionEnvironment,
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
index 1348ec2..6b1724d 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImplTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.types.Row;
@@ -92,9 +93,11 @@ public class StreamTableEnvironmentImplTest {
 			StreamExecutionEnvironment env,
 			DataStreamSource<Integer> elements) {
 		CatalogManager catalogManager = new CatalogManager("cat", new GenericInMemoryCatalog("cat", "db"));
+		ModuleManager moduleManager = new ModuleManager();
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
-			new FunctionCatalog(catalogManager),
+			moduleManager,
+			new FunctionCatalog(catalogManager, moduleManager),
 			new TableConfig(),
 			env,
 			new TestPlanner(elements.getTransformation()),
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 23589b3..344e7ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -27,6 +27,9 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.descriptors.ConnectTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
@@ -104,6 +107,23 @@ public interface TableEnvironment {
 	Optional<Catalog> getCatalog(String catalogName);
 
 	/**
+	 * Loads a {@link Module} under a unique name. Modules will be kept in the loaded order.
+	 *
+	 * @param moduleName name of the {@link Module}
+	 * @param module the module instance
+	 * @throws ModuleAlreadyExistException thrown when there is already a module with the same name
+	 */
+	void loadModule(String moduleName, Module module) throws ModuleAlreadyExistException;
+
+	/**
+	 * Unloads a {@link Module} with given name.
+	 *
+	 * @param moduleName name of the {@link Module}
+	 * @throws ModuleNotFoundException thrown when there is no module with the given name
+	 */
+	void unloadModule(String moduleName) throws ModuleNotFoundException;
+
+	/**
 	 * Registers a {@link ScalarFunction} under a unique name. Replaces already existing
 	 * user-defined functions under this name.
 	 */
@@ -238,6 +258,13 @@ public interface TableEnvironment {
 	String[] listCatalogs();
 
 	/**
+	 * Gets an array of names of all modules in this environment in the loaded order.
+	 *
+	 * @return A list of the names of all modules in the loaded order.
+	 */
+	String[] listModules();
+
+	/**
 	 * Gets the names of all databases registered in the current catalog.
 	 *
 	 * @return A list of the names of all registered databases in the current catalog.
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 12e874e..d13dea3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -48,6 +48,10 @@ import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.ModifyOperation;
@@ -80,6 +84,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	// and this should always be true. This avoids too many hard code.
 	private static final boolean IS_STREAM_TABLE = true;
 	private final CatalogManager catalogManager;
+	private final ModuleManager moduleManager;
 	private final OperationTreeBuilder operationTreeBuilder;
 	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
 
@@ -90,12 +95,14 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	protected TableEnvironmentImpl(
 			CatalogManager catalogManager,
+			ModuleManager moduleManager,
 			TableConfig tableConfig,
 			Executor executor,
 			FunctionCatalog functionCatalog,
 			Planner planner,
 			boolean isStreamingMode) {
 		this.catalogManager = catalogManager;
+		this.moduleManager = moduleManager;
 		this.execEnv = executor;
 
 		this.tableConfig = tableConfig;
@@ -118,7 +125,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
 
-		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+		ModuleManager moduleManager = new ModuleManager();
+		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
 
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
@@ -131,6 +139,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		return new TableEnvironmentImpl(
 			catalogManager,
+			moduleManager,
 			tableConfig,
 			executor,
 			functionCatalog,
@@ -162,6 +171,16 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
+	public void loadModule(String moduleName, Module module) throws ModuleAlreadyExistException {
+		moduleManager.loadModule(moduleName, module);
+	}
+
+	@Override
+	public void unloadModule(String moduleName) throws ModuleNotFoundException {
+		moduleManager.unloadModule(moduleName);
+	}
+
+	@Override
 	public void registerFunction(String name, ScalarFunction function) {
 		functionCatalog.registerTempSystemScalarFunction(
 			name,
@@ -230,6 +249,11 @@ public class TableEnvironmentImpl implements TableEnvironment {
 	}
 
 	@Override
+	public String[] listModules() {
+		return moduleManager.listModules().toArray(new String[0]);
+	}
+
+	@Override
 	public String[] listDatabases() {
 		return catalogManager.getCatalog(catalogManager.getCurrentCatalog())
 			.get()
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 6808216..61f1154 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedAggregateFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.util.Preconditions;
 
 import java.util.HashSet;
@@ -57,6 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class FunctionCatalog implements FunctionLookup {
 
 	private final CatalogManager catalogManager;
+	private final ModuleManager moduleManager;
 
 	private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
 	private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
@@ -66,8 +68,9 @@ public class FunctionCatalog implements FunctionLookup {
 	 */
 	private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
 
-	public FunctionCatalog(CatalogManager catalogManager) {
+	public FunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) {
 		this.catalogManager = checkNotNull(catalogManager);
+		this.moduleManager = checkNotNull(moduleManager);
 	}
 
 	public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
new file mode 100644
index 0000000..9216237
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/module/ModuleManager.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.module.exceptions.ModuleAlreadyExistException;
+import org.apache.flink.table.module.exceptions.ModuleNotFoundException;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responsible for loading/unloading modules, managing their life cycles, and resolving module objects.
+ */
+public class ModuleManager {
+	private LinkedHashMap<String, Module> modules;
+
+	public ModuleManager() {
+		this.modules = new LinkedHashMap<>();
+
+		// TODO: Add Core module to modules
+	}
+
+	/**
+	 * Load a module under a unique name. Modules will be kept in the loaded order, and new module
+	 * will be added to the end.
+	 *
+	 * @param name name of the module
+	 * @param module the module instance
+	 * @throws ModuleAlreadyExistException thrown when there is already a module with the same name
+	 */
+	public void loadModule(String name, Module module) throws ModuleAlreadyExistException {
+		checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name cannot be null or empty string");
+		checkNotNull(module, "module cannot be null");
+
+		if (!modules.containsKey(name)) {
+			modules.put(name, module);
+		} else {
+			throw new ModuleAlreadyExistException(name);
+		}
+	}
+
+	/**
+	 * Unload a module with given name.
+	 *
+	 * @param name name of the module
+	 * @throws ModuleNotFoundException thrown when there is no module with the given name
+	 */
+	public void unloadModule(String name) throws ModuleNotFoundException {
+		if (modules.containsKey(name)) {
+			modules.remove(name);
+		} else {
+			throw new ModuleNotFoundException(name);
+		}
+	}
+
+	/**
+	 * Get names of all modules loaded.
+	 *
+	 * @return a list of names of modules loaded
+	 */
+	public List<String> listModules() {
+		return new ArrayList<>(modules.keySet());
+	}
+
+	/**
+	 * Get names of all functions from all modules.
+	 *
+	 * @return a set of names of registered modules.
+	 */
+	public Set<String> listFunctions() {
+		return modules.values().stream()
+				.map(m -> m.listFunctions())
+				.flatMap(n -> n.stream())
+				.collect(Collectors.toSet());
+	}
+
+	/**
+	 * Get an optional of {@link FunctionDefinition} by a given name.
+	 * Function will be resolved to modules in the loaded order, and the first match will be returned.
+	 * If no match is found in all modules, return an optional.
+	 *
+	 * @param name name of the function
+	 * @return an optional of {@link FunctionDefinition}
+	 */
+	public Optional<FunctionDefinition> getFunctionDefinition(String name) {
+		Optional<Module> module = modules.values().stream()
+			.filter(p -> p.listFunctions().stream().anyMatch(e -> e.equals(name)))
+			.findFirst();
+
+		return module.isPresent() ? module.get().getFunctionDefinition(name) : Optional.empty();
+	}
+
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index fb11cbc..5f79dd5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.module.ModuleManager;
 
 import org.junit.Test;
 
@@ -37,7 +38,8 @@ public class FunctionCatalogTest {
 	@Test
 	public void testGetBuiltInFunctions() {
 		FunctionCatalog functionCatalog = new FunctionCatalog(
-			new CatalogManager("test", new GenericInMemoryCatalog("test")));
+			new CatalogManager("test", new GenericInMemoryCatalog("test")),
+			new ModuleManager());
 
 		Set<String> actual = new HashSet<>();
 		Collections.addAll(actual, functionCatalog.getFunctions());
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index dc8b992..fa2c58c 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.module.ModuleManager;
 
 /**
  * Mocking {@link TableEnvironment} for tests.
@@ -41,12 +42,13 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
 
 	protected TableEnvironmentMock(
 			CatalogManager catalogManager,
+			ModuleManager moduleManager,
 			TableConfig tableConfig,
 			ExecutorMock executor,
 			FunctionCatalog functionCatalog,
 			PlannerMock planner,
 			boolean isStreamingMode) {
-		super(catalogManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
+		super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode);
 
 		this.catalogManager = catalogManager;
 		this.executor = executor;
@@ -64,11 +66,13 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
 
 	private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
 		final CatalogManager catalogManager = createCatalogManager();
+		final ModuleManager moduleManager = new ModuleManager();
 		return new TableEnvironmentMock(
 			catalogManager,
+			moduleManager,
 			createTableConfig(),
 			createExecutor(),
-			createFunctionCatalog(catalogManager),
+			createFunctionCatalog(catalogManager, moduleManager),
 			createPlanner(),
 			isStreamingMode);
 	}
@@ -89,8 +93,8 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
 		return new ExecutorMock();
 	}
 
-	private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager) {
-		return new FunctionCatalog(catalogManager);
+	private static FunctionCatalog createFunctionCatalog(CatalogManager catalogManager, ModuleManager moduleManager) {
+		return new FunctionCatalog(catalogManager, moduleManager);
 	}
 
 	private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 516e6b1..c57b035 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.sinks.TableSink
 
 /**
@@ -294,7 +295,8 @@ object BatchTableEnvironment {
         .getConstructor(
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
-          classOf[CatalogManager])
+          classOf[CatalogManager],
+          classOf[ModuleManager])
       val builtInCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
         "default_catalog",
@@ -302,7 +304,8 @@ object BatchTableEnvironment {
           builtInCatalog,
           "default_database")
       )
-      const.newInstance(executionEnvironment, tableConfig, catalogManager)
+      val moduleManager = new ModuleManager
+      const.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager)
         .asInstanceOf[BatchTableEnvironment]
     } catch {
       case t: Throwable => throw new TableException("Create BatchTableEnvironment failed.", t)
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 1384be0..97da798 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -35,14 +35,15 @@ import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescr
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.factories.ComponentFactoryService
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations.{OutputConversionModifyOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
-
 import java.util
 import java.util.{Collections, List => JList, Map => JMap}
 
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -52,6 +53,7 @@ import _root_.scala.collection.JavaConverters._
 @Internal
 class StreamTableEnvironmentImpl (
     catalogManager: CatalogManager,
+    moduleManager: ModuleManager,
     functionCatalog: FunctionCatalog,
     config: TableConfig,
     scalaExecutionEnvironment: StreamExecutionEnvironment,
@@ -60,6 +62,7 @@ class StreamTableEnvironmentImpl (
     isStreaming: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
+    moduleManager,
     config,
     executor,
     functionCatalog,
@@ -278,7 +281,8 @@ object StreamTableEnvironmentImpl {
       settings.getBuiltInCatalogName,
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
 
-    val functionCatalog = new FunctionCatalog(catalogManager)
+    val moduleManager = new ModuleManager
+    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
 
     val executorProperties = settings.toExecutorProperties
     val executor = lookupExecutor(executorProperties, executionEnvironment)
@@ -294,6 +298,7 @@ object StreamTableEnvironmentImpl {
 
     new StreamTableEnvironmentImpl(
       catalogManager,
+      moduleManager,
       functionCatalog,
       tableConfig,
       executionEnvironment,
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
index 91b37d0..cf36ed2 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -26,13 +26,13 @@ import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericI
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.operations.{ModifyOperation, Operation}
 import org.apache.flink.types.Row
-
 import org.hamcrest.CoreMatchers.equalTo
 import org.junit.Assert.assertThat
 import org.junit.Test
-
 import java.util.{Collections, List => JList}
 
+import org.apache.flink.table.module.ModuleManager
+
 /**
  * Tests for [[StreamTableEnvironmentImpl]].
  */
@@ -83,9 +83,11 @@ class StreamTableEnvironmentImplTest {
     val catalogManager = new CatalogManager(
       "cat",
       new GenericInMemoryCatalog("cat", "db"))
+    val moduleManager = new ModuleManager
     new StreamTableEnvironmentImpl(
       catalogManager,
-      new FunctionCatalog(catalogManager),
+      moduleManager,
+      new FunctionCatalog(catalogManager, moduleManager),
       new TableConfig,
       env,
       new TestPlanner(elements.javaStream.getTransformation),
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
new file mode 100644
index 0000000..eccbe70
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.FunctionDefinition;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Modules define a set of metadata, including functions, user defined types, operators, rules, etc.
+ * Metadata from modules are regarded as built-in or system metadata that users can take advantages of.
+ */
+@PublicEvolving
+public interface Module {
+
+	/**
+	 * List names of all functions in this module.
+	 *
+	 * @return a set of function names
+	 */
+	default Set<String> listFunctions() {
+		return Collections.emptySet();
+	}
+
+	/**
+	 * Get an optional of {@link FunctionDefinition} by a give name.
+	 *
+	 * @param name name of the {@link FunctionDefinition}.
+	 * @return an optional function definition
+	 */
+	default Optional<FunctionDefinition> getFunctionDefinition(String name) {
+		return Optional.empty();
+	}
+
+	// user defined types, operators, rules, etc
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java
new file mode 100644
index 0000000..1f27ba6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleAlreadyExistException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module.exceptions;
+
+/**
+ * Exception for trying to load a module that already exists.
+ */
+public class ModuleAlreadyExistException extends Exception {
+	public ModuleAlreadyExistException(String moduleName) {
+		super(String.format("Module %s already exists.", moduleName));
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java
new file mode 100644
index 0000000..bf575cd
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/exceptions/ModuleNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.module.exceptions;
+
+/**
+ * Exception for operating on a module that does not exists.
+ */
+public class ModuleNotFoundException extends Exception {
+	public ModuleNotFoundException(String moduleName) {
+		super(String.format("Module %s does not exist.", moduleName));
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 1acdd88..9f87275 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -71,7 +72,8 @@ public class SqlToOperationConverterTest {
 		"default");
 	private final CatalogManager catalogManager =
 		new CatalogManager("builtin", catalog);
-	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+	private final ModuleManager moduleManager = new ModuleManager();
+	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
 	private final PlannerContext plannerContext =
 		new PlannerContext(tableConfig,
 			functionCatalog,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
index 92905e7..554d51b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.schema._
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
@@ -46,9 +45,10 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
+import org.apache.flink.table.module.ModuleManager
+
 import scala.collection.JavaConversions._
 
 /**
@@ -83,7 +83,8 @@ class AggCallSelectivityEstimatorTest {
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
     val catalogManager = mock(classOf[CatalogManager])
-    val functionCatalog = new FunctionCatalog(catalogManager)
+    val moduleManager = mock(classOf[ModuleManager])
+    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
     val context = new FlinkContextImpl(new TableConfig, functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index dd6d64d..3802d44 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -67,10 +67,11 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
 import org.junit.{Before, BeforeClass}
-
 import java.math.BigDecimal
 import java.util
 
+import org.apache.flink.table.module.ModuleManager
+
 import scala.collection.JavaConversions._
 
 class FlinkRelMdHandlerTestBase {
@@ -82,13 +83,14 @@ class FlinkRelMdHandlerTestBase {
   val builtinDatabase = "default_database"
   val catalogManager = new CatalogManager(
     builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, builtinDatabase))
+  val moduleManager = new ModuleManager
 
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
   val plannerContext: PlannerContext =
     new PlannerContext(
       tableConfig,
-      new FunctionCatalog(catalogManager),
+      new FunctionCatalog(catalogManager, moduleManager),
       CalciteSchema.from(rootSchema),
       util.Arrays.asList(
         ConventionTraitDef.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
index 99de5a7..f16ae27 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/SelectivityEstimatorTest.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.planner.plan.schema._
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
@@ -43,9 +42,10 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
+import org.apache.flink.table.module.ModuleManager
+
 import scala.collection.JavaConverters._
 
 /**
@@ -85,7 +85,8 @@ class SelectivityEstimatorTest {
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
     val catalogManager = mock(classOf[CatalogManager])
-    val functionCatalog = new FunctionCatalog(catalogManager)
+    val moduleManager = mock(classOf[ModuleManager])
+    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
     val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index f0289e4..efe2f6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -31,7 +31,6 @@ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.planner.plan.utils.InputTypeBuilder.inputOf
 import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction}
-
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.SqlPostfixOperator
@@ -42,11 +41,12 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
 import org.hamcrest.CoreMatchers.is
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue}
 import org.junit.Test
-
 import java.math.BigDecimal
 import java.sql.Timestamp
 import java.util.{TimeZone, List => JList}
 
+import org.apache.flink.table.module.ModuleManager
+
 import scala.collection.JavaConverters._
 
 /**
@@ -56,8 +56,8 @@ class RexNodeExtractorTest extends RexNodeTestBase {
   val defaultCatalog = "default_catalog"
   val catalogManager = new CatalogManager(
     defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
-
-  private val functionCatalog = new FunctionCatalog(catalogManager)
+  val moduleManager = new ModuleManager
+  private val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index a452717..4ded0fc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -57,16 +57,16 @@ import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
-
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.commons.lang3.SystemUtils
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Rule
 import org.junit.rules.{ExpectedException, TestName}
-
 import _root_.java.util
 
+import org.apache.flink.table.module.ModuleManager
+
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.io.Source
 
@@ -887,6 +887,7 @@ class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
 
 class TestingTableEnvironment private(
     catalogManager: CatalogManager,
+    moduleManager: ModuleManager,
     tableConfig: TableConfig,
     executor: Executor,
     functionCatalog: FunctionCatalog,
@@ -894,6 +895,7 @@ class TestingTableEnvironment private(
     isStreamingMode: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
+    moduleManager,
     tableConfig,
     executor,
     functionCatalog,
@@ -1027,7 +1029,8 @@ object TestingTableEnvironment {
           new GenericInMemoryCatalog(
             settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
     }
-    val functionCatalog = new FunctionCatalog(catalogMgr)
+    val moduleManager = new ModuleManager
+    val functionCatalog = new FunctionCatalog(catalogMgr, moduleManager)
     val plannerProperties = settings.toPlannerProperties
     val executorProperties = settings.toExecutorProperties
     val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
@@ -1036,7 +1039,13 @@ object TestingTableEnvironment {
       .create(plannerProperties, executor, tableConfig, functionCatalog, catalogMgr)
       .asInstanceOf[PlannerBase]
     new TestingTableEnvironment(
-      catalogMgr, tableConfig, executor, functionCatalog, planner, settings.isStreamingMode)
+      catalogMgr,
+      moduleManager,
+      tableConfig,
+      executor,
+      functionCatalog,
+      planner,
+      settings.isStreamingMode)
   }
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index a4ed523..5ec2bba 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor
 import org.apache.flink.table.expressions.{Expression, UnresolvedCallExpression}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations.DataSetQueryOperation
 import org.apache.flink.table.plan.BatchOptimizer
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
@@ -59,8 +60,9 @@ import _root_.scala.collection.JavaConversions._
 abstract class BatchTableEnvImpl(
     private[flink] val execEnv: ExecutionEnvironment,
     config: TableConfig,
-    catalogManager: CatalogManager)
-  extends TableEnvImpl(config, catalogManager) {
+    catalogManager: CatalogManager,
+    moduleManager: ModuleManager)
+  extends TableEnvImpl(config, catalogManager, moduleManager) {
 
   private[flink] val optimizer = new BatchOptimizer(
     () => config.getPlannerConfig.unwrap(classOf[CalciteConfig]).orElse(CalciteConfig.DEFAULT),
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index a00ea35..e028032 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
+import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations.ddl.CreateTableOperation
 import org.apache.flink.table.operations.utils.OperationTreeBuilder
 import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
@@ -37,12 +38,10 @@ import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSi
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
-
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.tools.FrameworkConfig
-
 import _root_.java.util.{Optional, HashMap => JHashMap, Map => JMap}
 
 import _root_.scala.collection.JavaConversions._
@@ -55,11 +54,13 @@ import _root_.scala.collection.JavaConverters._
   */
 abstract class TableEnvImpl(
     val config: TableConfig,
-    private val catalogManager: CatalogManager)
+    private val catalogManager: CatalogManager,
+    private val moduleManager: ModuleManager)
   extends TableEnvironment {
 
   // Table API/SQL function catalog
-  private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
+  private[flink] val functionCatalog: FunctionCatalog =
+    new FunctionCatalog(catalogManager, moduleManager)
 
   // temporary utility until we don't use planner expressions anymore
   functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
@@ -156,6 +157,14 @@ abstract class TableEnvImpl(
     catalogManager.getCatalog(catalogName)
   }
 
+  override def loadModule(moduleName: String, module: Module): Unit = {
+    moduleManager.loadModule(moduleName, module)
+  }
+
+  override def unloadModule(moduleName: String): Unit = {
+    moduleManager.unloadModule(moduleName)
+  }
+
   override def getCurrentCatalog: String = {
     catalogManager.getCurrentCatalog
   }
@@ -329,6 +338,10 @@ abstract class TableEnvImpl(
       .map(t => new CatalogQueryOperation(objectIdentifier, t.getSchema))
   }
 
+  override def listModules(): Array[String] = {
+    moduleManager.listModules().asScala.toArray
+  }
+
   override def listCatalogs(): Array[String] = {
     catalogManager.getCatalogs.asScala.toArray
   }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
index 7358721..971ba37 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/java/internal/BatchTableEnvironmentImpl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.java.BatchTableEnvironment
 import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.expressions.ExpressionParser
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
 
 import _root_.scala.collection.JavaConverters._
 
@@ -39,11 +40,13 @@ import _root_.scala.collection.JavaConverters._
 class BatchTableEnvironmentImpl(
     execEnv: ExecutionEnvironment,
     config: TableConfig,
-    catalogManager: CatalogManager)
+    catalogManager: CatalogManager,
+    moduleManager: ModuleManager)
   extends BatchTableEnvImpl(
     execEnv,
     config,
-    catalogManager)
+    catalogManager,
+    moduleManager)
   with org.apache.flink.table.api.java.BatchTableEnvironment {
 
   override def fromDataSet[T](dataSet: DataSet[T]): Table = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
index 8e13426..7324a4b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/internal/BatchTableEnvironmentImpl.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.scala.BatchTableEnvironment
 import org.apache.flink.table.catalog.CatalogManager
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
+import org.apache.flink.table.module.ModuleManager
 
 import _root_.scala.reflect.ClassTag
 
@@ -38,11 +39,13 @@ import _root_.scala.reflect.ClassTag
 class BatchTableEnvironmentImpl(
     execEnv: ExecutionEnvironment,
     config: TableConfig,
-    catalogManager: CatalogManager)
+    catalogManager: CatalogManager,
+    moduleManager: ModuleManager)
   extends BatchTableEnvImpl(
     execEnv.getJavaEnv,
     config,
-    catalogManager)
+    catalogManager,
+    moduleManager)
   with org.apache.flink.table.api.scala.BatchTableEnvironment {
 
   override def fromDataSet[T](dataSet: DataSet[T]): Table = {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index de1d0e0..334fcb3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexProgram
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.expressions.{Expression, PlannerExpression}
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
 import org.apache.flink.table.plan.util.RexProgramExtractor
 import org.apache.flink.table.sources.FilterableTableSource
@@ -73,7 +74,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       RexProgramExtractor.extractConjunctiveConditions(
         program,
         call.builder().getRexBuilder,
-        new FunctionCatalog(catalogManager))
+        new FunctionCatalog(catalogManager, new ModuleManager))
     if (predicates.isEmpty) {
       // no condition can be translated to expression
       return
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 0424109..79f5470 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.expressions.ExpressionBridge;
 import org.apache.flink.table.expressions.PlannerExpressionConverter;
+import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -74,7 +75,8 @@ public class SqlToOperationConverterTest {
 		"default");
 	private final CatalogManager catalogManager =
 		new CatalogManager("builtin", catalog);
-	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+	private final ModuleManager moduleManager = new ModuleManager();
+	private final FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager, moduleManager);
 	private final PlanningConfigurationBuilder planningConfigurationBuilder =
 		new PlanningConfigurationBuilder(tableConfig,
 			functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 11e43cf..a3450f4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -36,12 +36,12 @@ import org.apache.flink.table.runtime.utils.StreamTestData
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
-
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
-
 import java.lang.{Integer => JInt, Long => JLong}
 
+import org.apache.flink.table.module.ModuleManager
+
 class StreamTableEnvironmentTest extends TableTestBase {
 
   @Test
@@ -206,11 +206,13 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val manager: CatalogManager = new CatalogManager(
       "default_catalog",
       new GenericInMemoryCatalog("default_catalog", "default_database"))
+    val moduleManager: ModuleManager = new ModuleManager
     val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
-    val functionCatalog = new FunctionCatalog(manager)
+    val functionCatalog = new FunctionCatalog(manager, moduleManager)
     val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
     val jTEnv = new JStreamTableEnvironmentImpl(
       manager,
+      moduleManager,
       functionCatalog,
       config,
       jStreamExecEnv,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index c917267..1bd89f1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -29,6 +29,7 @@ import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition}
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.apache.flink.types.Row
@@ -69,10 +70,11 @@ class AggregateTest extends TableTestBase {
     val defaultCatalog = "default_catalog"
     val catalogManager = new CatalogManager(
       defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
-
-    val functionCatalog = new FunctionCatalog(catalogManager)
+    val moduleManager = new ModuleManager
+    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)
     val tablEnv = new StreamTableEnvironmentImpl(
       catalogManager,
+      moduleManager,
       functionCatalog,
       new TableConfig,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index b752b76..d109726 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.expressions._
+import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
 import org.hamcrest.CoreMatchers.is
@@ -42,7 +43,9 @@ import scala.collection.mutable
 class RexProgramExtractorTest extends RexProgramTestBase {
 
   private val functionCatalog: FunctionCatalog = new FunctionCatalog(
-    new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")))
+    new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")),
+    new ModuleManager
+  )
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
       functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 6104724..b0f11f3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -26,9 +26,10 @@ import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDesc
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
-
 import java.util.Optional
 
+import org.apache.flink.table.module.Module
+
 class MockTableEnvironment extends TableEnvironment {
 
   override def fromTableSource(source: TableSource[_]): Table = ???
@@ -52,6 +53,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def listCatalogs(): Array[String] = ???
 
+  override def listModules(): Array[String] = ???
+
   override def listDatabases(): Array[String] = ???
 
   override def listTables(): Array[String] = ???
@@ -94,4 +97,8 @@ class MockTableEnvironment extends TableEnvironment {
     sinkPathContinued: String*): Unit = ???
 
   override def execute(jobName: String): JobExecutionResult = ???
+
+  override def loadModule(moduleName: String, module: Module): Unit = ???
+
+  override def unloadModule(moduleName: String): Unit = ???
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 6e0c3a3..5ebc79e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -37,9 +37,9 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
 import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
-
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.module.ModuleManager
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{ComparisonFailure, Rule}
@@ -231,12 +231,14 @@ case class BatchTableTestUtil(
   val javaTableEnv = new JavaBatchTableEnvironmentImpl(
     javaEnv,
     new TableConfig,
-    catalogManager.getOrElse(createCatalogManager()))
+    catalogManager.getOrElse(createCatalogManager()),
+    new ModuleManager)
   val env = new ExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaBatchTableEnvironmentImpl(
     env,
     new TableConfig,
-    catalogManager.getOrElse(createCatalogManager()))
+    catalogManager.getOrElse(createCatalogManager()),
+    new ModuleManager)
 
   def addTable[T: TypeInformation](
       name: String,
@@ -328,12 +330,14 @@ case class StreamTableTestUtil(
 
   private val tableConfig = new TableConfig
   private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
+  private val moduleManager: ModuleManager = new ModuleManager
   private val executor: StreamExecutor = new StreamExecutor(javaEnv)
-  private val functionCatalog = new FunctionCatalog(manager)
+  private val functionCatalog = new FunctionCatalog(manager, moduleManager)
   private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
 
   val javaTableEnv = new JavaStreamTableEnvironmentImpl(
     manager,
+    moduleManager,
     functionCatalog,
     tableConfig,
     javaEnv,
@@ -344,6 +348,7 @@ case class StreamTableTestUtil(
   val env = new StreamExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaStreamTableEnvironmentImpl(
     manager,
+    moduleManager,
     functionCatalog,
     tableConfig,
     env,