You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/15 21:04:54 UTC

[flink] branch master updated: [FLINK-13024][table] integrate FunctionCatalog with CatalogManager

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d71c4ee  [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
d71c4ee is described below

commit d71c4eed25cbd7d183cafb29e8bfbb17df904cbd
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Jul 5 15:37:37 2019 -0700

    [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
    
    This PR integrates FunctionCatalog with Catalog APIs.
    
    This closes #8920.
---
 .../batch/connectors/hive/HiveTableFactory.java    | 106 ++++++++++++++++++-
 .../java/internal/StreamTableEnvironmentImpl.java  |   5 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   5 +-
 .../apache/flink/table/catalog/CatalogManager.java |  31 ++++--
 .../flink/table/catalog/FunctionCatalog.java       | 114 +++++++++++++++------
 .../internal/StreamTableEnvironmentImpl.scala      |   5 +-
 .../org/apache/flink/table/catalog/Catalog.java    |   4 +-
 .../table/factories/FunctionDefinitionFactory.java |  38 +++++++
 .../metadata/AggCallSelectivityEstimatorTest.scala |   7 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  11 +-
 .../plan/metadata/SelectivityEstimatorTest.scala   |   7 +-
 .../table/plan/util/RexNodeExtractorTest.scala     |  17 +--
 .../apache/flink/table/util/TableTestBase.scala    |   3 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |   5 +-
 .../PushFilterIntoTableSourceScanRule.scala        |   8 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |   2 +-
 .../flink/table/api/stream/sql/AggregateTest.scala |  10 +-
 .../flink/table/plan/RexProgramExtractorTest.scala |   5 +-
 .../apache/flink/table/utils/TableTestBase.scala   |   3 +-
 19 files changed, 296 insertions(+), 90 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index a22014a..9c8beaa 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -18,13 +18,28 @@
 
 package org.apache.flink.batch.connectors.hive;
 
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
+import org.apache.flink.table.functions.hive.HiveGenericUDAF;
+import org.apache.flink.table.functions.hive.HiveGenericUDF;
+import org.apache.flink.table.functions.hive.HiveGenericUDTF;
+import org.apache.flink.table.functions.hive.HiveSimpleUDF;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
@@ -33,7 +48,15 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
@@ -41,13 +64,19 @@ import java.util.Map;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A table factory implementation for tables stored in Hive catalog.
+ * A table factory implementation for Hive catalog.
  */
-public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
-	private HiveConf hiveConf;
+public class HiveTableFactory
+		implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
+
+	private final HiveConf hiveConf;
+	private final String hiveVersion;
 
 	public HiveTableFactory(HiveConf hiveConf) {
 		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
+
+		this.hiveVersion = new JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
 	}
 
 	@Override
@@ -112,4 +141,75 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
 		return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
 	}
 
+	@Override
+	public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
+		String functionClassName = catalogFunction.getClassName();
+
+		if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
+			throw new TableException(
+				String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
+		}
+
+		Class clazz;
+		try {
+			clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
+
+			LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);
+		} catch (ClassNotFoundException e) {
+			throw new TableException(
+				String.format("Failed to initiate an instance of class %s.", functionClassName), e);
+		}
+
+		if (UDF.class.isAssignableFrom(clazz)) {
+			LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
+
+			return new ScalarFunctionDefinition(
+				name,
+				new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName))
+			);
+		} else if (GenericUDF.class.isAssignableFrom(clazz)) {
+			LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name);
+
+			return new ScalarFunctionDefinition(
+				name,
+				new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName))
+			);
+		} else if (GenericUDTF.class.isAssignableFrom(clazz)) {
+			LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
+
+			HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName));
+
+			return new TableFunctionDefinition(
+				name,
+				udtf,
+				GenericTypeInfo.of(Row.class)
+			);
+		} else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) {
+			HiveGenericUDAF udaf;
+
+			if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) {
+				LOG.info(
+					"Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s",
+					name, hiveVersion);
+
+				udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
+			} else {
+				LOG.info(
+					"Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s",
+					name, hiveVersion);
+
+				udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
+			}
+
+			return new AggregateFunctionDefinition(
+				name,
+				udaf,
+				GenericTypeInfo.of(Object.class),
+				GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
+			);
+		} else {
+			throw new IllegalArgumentException(
+				String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName));
+		}
+	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 86cd203..4d0586c 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -102,13 +102,12 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			EnvironmentSettings settings,
 			TableConfig tableConfig) {
 
-		FunctionCatalog functionCatalog = new FunctionCatalog(
-			settings.getBuiltInCatalogName(),
-			settings.getBuiltInDatabaseName());
 		CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
 
+		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 988fa88..f2aea62 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -123,13 +123,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	public static TableEnvironmentImpl create(EnvironmentSettings settings) {
 
-		FunctionCatalog functionCatalog = new FunctionCatalog(
-			settings.getBuiltInCatalogName(),
-			settings.getBuiltInDatabaseName());
 		CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
 
+		FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
 			.create(executorProperties);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 0f83176..c5d0bc7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -59,11 +59,14 @@ public class CatalogManager {
 	// TO BE REMOVED along with ExternalCatalog API
 	private Map<String, ExternalCatalog>  externalCatalogs;
 
-	// The name of the default catalog and schema
+	// The name of the current catalog and database
 	private String currentCatalogName;
 
 	private String currentDatabaseName;
 
+	// The name of the default catalog
+	private final String defaultCatalogName;
+
 	/**
 	 * Temporary solution to handle both {@link CatalogBaseTable} and
 	 * {@link ExternalCatalogTable} in a single call.
@@ -125,6 +128,7 @@ public class CatalogManager {
 		catalogs.put(defaultCatalogName, defaultCatalog);
 		this.currentCatalogName = defaultCatalogName;
 		this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+		this.defaultCatalogName = defaultCatalogName;
 	}
 
 	/**
@@ -215,9 +219,9 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Gets the current default catalog that will be used when resolving table path.
+	 * Gets the current catalog that will be used when resolving table path.
 	 *
-	 * @return the current default catalog
+	 * @return the current catalog
 	 * @see CatalogManager#resolveTable(String...)
 	 */
 	public String getCurrentCatalog() {
@@ -225,9 +229,9 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Sets the current default catalog name that will be used when resolving table path.
+	 * Sets the current catalog name that will be used when resolving table path.
 	 *
-	 * @param catalogName catalog name to set as current default catalog
+	 * @param catalogName catalog name to set as current catalog
 	 * @throws CatalogNotExistException thrown if the catalog doesn't exist
 	 * @see CatalogManager#resolveTable(String...)
 	 */
@@ -255,9 +259,9 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Gets the current default database name that will be used when resolving table path.
+	 * Gets the current database name that will be used when resolving table path.
 	 *
-	 * @return the current default database
+	 * @return the current database
 	 * @see CatalogManager#resolveTable(String...)
 	 */
 	public String getCurrentDatabase() {
@@ -265,10 +269,10 @@ public class CatalogManager {
 	}
 
 	/**
-	 * Sets the current default database name that will be used when resolving a table path.
+	 * Sets the current database name that will be used when resolving a table path.
 	 * The database has to exist in the current catalog.
 	 *
-	 * @param databaseName database name to set as current default database name
+	 * @param databaseName database name to set as current database name
 	 * @throws CatalogException thrown if the database doesn't exist in the current catalog
 	 * @see CatalogManager#resolveTable(String...)
 	 * @see CatalogManager#setCurrentCatalog(String)
@@ -294,6 +298,15 @@ public class CatalogManager {
 	}
 
 	/**
+	 * Gets the default catalog name.
+	 *
+	 * @return the default catalog
+	 */
+	public String getDefaultCatalogName() {
+		return defaultCatalogName;
+	}
+
+	/**
 	 * Tries to resolve a table path to a {@link ResolvedTable}. The algorithm looks for requested table
 	 * in the following paths in that order:
 	 * <ol>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 0d18ffe..af7f21f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -36,21 +39,26 @@ import org.apache.flink.table.functions.UserDefinedAggregateFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Simple function catalog to store {@link FunctionDefinition}s in memory.
+ * Simple function catalog to store {@link FunctionDefinition}s in catalogs.
  */
 @Internal
 public class FunctionCatalog implements FunctionLookup {
 
-	private final String defaultCatalogName;
-
-	private final String defaultDatabaseName;
+	private final CatalogManager catalogManager;
 
+	// For simplicity, currently hold registered Flink functions in memory here
+	// TODO: should move to catalog
 	private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>();
 
 	/**
@@ -58,11 +66,8 @@ public class FunctionCatalog implements FunctionLookup {
 	 */
 	private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
 
-	public FunctionCatalog(
-			String defaultCatalogName,
-			String defaultDatabaseName) {
-		this.defaultCatalogName = defaultCatalogName;
-		this.defaultDatabaseName = defaultDatabaseName;
+	public FunctionCatalog(CatalogManager catalogManager) {
+		this.catalogManager = checkNotNull(catalogManager);
 	}
 
 	public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
@@ -129,35 +134,81 @@ public class FunctionCatalog implements FunctionLookup {
 	}
 
 	public String[] getUserDefinedFunctions() {
-		return userFunctions.values().stream()
-			.map(FunctionDefinition::toString)
-			.toArray(String[]::new);
+		List<String> result = new ArrayList<>();
+
+		// Get functions in catalog
+		Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+		try {
+			result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase()));
+		} catch (DatabaseNotExistException e) {
+			// Ignore since there will always be a current database of the current catalog
+		}
+
+		// Get functions registered in memory
+		result.addAll(
+			userFunctions.values().stream()
+				.map(FunctionDefinition::toString)
+				.collect(Collectors.toList()));
+
+		return result.stream()
+			.collect(Collectors.toList())
+			.toArray(new String[0]);
 	}
 
 	@Override
 	public Optional<FunctionLookup.Result> lookupFunction(String name) {
-		final FunctionDefinition userCandidate = userFunctions.get(normalizeName(name));
-		final Optional<FunctionDefinition> foundDefinition;
-		if (userCandidate != null) {
-			foundDefinition = Optional.of(userCandidate);
-		} else {
+		String functionName = normalizeName(name);
 
-			// TODO once we connect this class with the Catalog APIs we need to make sure that
-			//  built-in functions are present in "root" built-in catalog. This allows to
-			//  overwrite built-in functions but also fallback to the "root" catalog. It should be
-			//  possible to disable the "root" catalog if that is desired.
+		FunctionDefinition userCandidate = null;
 
-			foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
-				.stream()
-				.filter(f -> normalizeName(name).equals(normalizeName(f.getName())))
-				.findFirst()
-				.map(Function.identity());
-		}
+		Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
 
-		return foundDefinition.map(definition -> new FunctionLookup.Result(
-			ObjectIdentifier.of(defaultCatalogName, defaultDatabaseName, name),
-			definition)
-		);
+		if (catalog.getTableFactory().isPresent() &&
+				catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
+			try {
+				CatalogFunction catalogFunction = catalog.getFunction(
+					new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+				FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
+
+				userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
+			} catch (FunctionNotExistException e) {
+				// Ignore
+			}
+
+			return Optional.of(
+					new FunctionLookup.Result(
+						ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
+						userCandidate)
+				);
+		} else {
+			// Else, check in-memory functions
+			userCandidate = userFunctions.get(functionName);
+
+			final Optional<FunctionDefinition> foundDefinition;
+			if (userCandidate != null) {
+				foundDefinition = Optional.of(userCandidate);
+			} else {
+
+				// TODO once we connect this class with the Catalog APIs we need to make sure that
+				//  built-in functions are present in "root" built-in catalog. This allows to
+				//  overwrite built-in functions but also fallback to the "root" catalog. It should be
+				//  possible to disable the "root" catalog if that is desired.
+
+				foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
+					.stream()
+					.filter(f -> functionName.equals(normalizeName(f.getName())))
+					.findFirst()
+					.map(Function.identity());
+			}
+
+			String defaultCatalogName = catalogManager.getDefaultCatalogName();
+
+			return foundDefinition.map(definition -> new FunctionLookup.Result(
+				ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+				definition)
+			);
+		}
 	}
 
 	@Override
@@ -169,6 +220,7 @@ public class FunctionCatalog implements FunctionLookup {
 	}
 
 	private void registerFunction(String name, FunctionDefinition functionDefinition) {
+		// TODO: should register to catalog
 		userFunctions.put(normalizeName(name), functionDefinition);
 	}
 
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c2b9831..1304c8c 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -275,13 +275,12 @@ object StreamTableEnvironmentImpl {
       tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
 
-    val functionCatalog = new FunctionCatalog(
-      settings.getBuiltInCatalogName,
-      settings.getBuiltInDatabaseName)
     val catalogManager = new CatalogManager(
       settings.getBuiltInCatalogName,
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
 
+    val functionCatalog = new FunctionCatalog(catalogManager)
+
     val executorProperties = settings.toExecutorProperties
     val executor = lookupExecutor(executorProperties, executionEnvironment)
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index dbb5c1b..8019ab0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -47,8 +47,8 @@ import java.util.Optional;
 public interface Catalog {
 
 	/**
-	 * Get an optional {@link TableFactory} instance that's responsible for generating source/sink for tables
-	 * stored in this catalog.
+	 * Get an optional {@link TableFactory} instance that's responsible for generating table-related
+	 * instances stored in this catalog, instances such as source/sink and function definitions.
 	 *
 	 * @return an optional TableFactory instance
 	 */
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
new file mode 100644
index 0000000..2e8c538
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
+
+/**
+ * A factory to create {@link FunctionDefinition}.
+ * See also {@link TableFactory} for more information.
+ */
+public interface FunctionDefinitionFactory extends TableFactory {
+
+	/**
+	 * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.
+	 *
+	 * @param name name of the {@link CatalogFunction}
+	 * @param catalogFunction the catalog function
+	 * @return a {@link FunctionDefinition}
+	 */
+	FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction);
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
index ea14fa9..b52d9e3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
 import org.apache.flink.table.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
@@ -45,7 +44,6 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
 import scala.collection.JavaConversions._
@@ -81,7 +79,8 @@ class AggCallSelectivityEstimatorTest {
     val tableScan = mock(classOf[TableScan])
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
-    val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+    val catalogManager = mock(classOf[CatalogManager])
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val context = new FlinkContextImpl(new TableConfig, functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 8d7c348..c1627a6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction
@@ -44,7 +44,6 @@ import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, Variabl
 import org.apache.flink.table.types.AtomicDataType
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LogicalType, TimestampKind, TimestampType, VarCharType}
 import org.apache.flink.table.util.CountAggFunction
-
 import com.google.common.collect.{ImmutableList, Lists}
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
@@ -64,7 +63,6 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
 import org.junit.{Before, BeforeClass}
-
 import java.math.BigDecimal
 import java.util
 
@@ -74,12 +72,17 @@ class FlinkRelMdHandlerTestBase {
 
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
+
+  val defaultCatalog = "default_catalog"
+  val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
   val plannerContext: PlannerContext =
     new PlannerContext(
       tableConfig,
-      new FunctionCatalog("default_catalog", "default_database"),
+      new FunctionCatalog(catalogManager),
       CalciteSchema.from(rootSchema),
       util.Arrays.asList(
         ConventionTraitDef.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
index 8494ae1..2259d94 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
 import org.apache.flink.table.{JDouble, JLong}
 import org.apache.flink.util.Preconditions
-
 import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
@@ -42,7 +41,6 @@ import org.junit.{Before, BeforeClass, Test}
 import org.powermock.api.mockito.PowerMockito._
 import org.powermock.core.classloader.annotations.PrepareForTest
 import org.powermock.modules.junit4.PowerMockRunner
-
 import java.math.BigDecimal
 
 import scala.collection.JavaConverters._
@@ -83,7 +81,8 @@ class SelectivityEstimatorTest {
     val tableScan = mock(classOf[TableScan])
     val cluster = mock(classOf[RelOptCluster])
     val planner = mock(classOf[AbstractRelOptPlanner])
-    val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+    val catalogManager = mock(classOf[CatalogManager])
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog)
     when(tableScan, "getCluster").thenReturn(cluster)
     when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index c963b1f..f8af378 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.plan.util
 
-import org.apache.flink.table.api.{DataTypes, Types}
-import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
 import org.apache.flink.table.expressions.utils.Func1
 import org.apache.flink.table.functions.AggregateFunctionDefinition
@@ -29,7 +26,6 @@ import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
 import org.apache.flink.table.util.{DateTimeTestUtil, IntSumAggFunction}
-
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.SqlPostfixOperator
@@ -40,21 +36,26 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
 import org.hamcrest.CoreMatchers.is
 import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue}
 import org.junit.Test
-
 import java.math.BigDecimal
 import java.sql.Timestamp
-import java.util.{List => JList}
-import java.sql.{Date, Time, Timestamp}
 import java.util.{TimeZone, List => JList}
 
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.expressions.{EqualTo, Expression, ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference}
+
 import scala.collection.JavaConverters._
 
 /**
   * Test for [[RexNodeExtractor]].
   */
 class RexNodeExtractorTest extends RexNodeTestBase {
+  val defaultCatalog = "default_catalog"
+  val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
 
-  private val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+  private val functionCatalog = new FunctionCatalog(catalogManager)
 
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 26a5268..38b5d12 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -985,8 +985,6 @@ object TestingTableEnvironment {
   def create(
       settings: EnvironmentSettings,
       catalogManager: Option[CatalogManager] = None): TestingTableEnvironment = {
-    val functionCatalog = new FunctionCatalog(
-      settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)
     val catalogMgr = catalogManager match {
       case Some(c) => c
       case _ =>
@@ -994,6 +992,7 @@ object TestingTableEnvironment {
           new GenericInMemoryCatalog(
             settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
     }
+    val functionCatalog = new FunctionCatalog(catalogMgr)
     val plannerProperties = settings.toPlannerProperties
     val executorProperties = settings.toExecutorProperties
     val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 13d6c53..3452af6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -64,9 +64,8 @@ abstract class TableEnvImpl(
   protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
 
   // Table API/SQL function catalog
-  private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
-    builtinCatalogName,
-    builtinDatabaseName)
+  private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
+
   // temporary utility until we don't use planner expressions anymore
   functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 0dc9805..9c16135 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -23,7 +23,7 @@ import java.util
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rex.RexProgram
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.expressions.{Expression, PlannerExpression}
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
 import org.apache.flink.table.plan.util.RexProgramExtractor
@@ -37,6 +37,10 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
     operand(classOf[FlinkLogicalTableSourceScan], none)),
   "PushFilterIntoTableSourceScanRule") {
 
+  private val defaultCatalog = "default_catalog"
+  private val catalogManager = new CatalogManager(
+    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
   override def matches(call: RelOptRuleCall): Boolean = {
     val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
     val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
@@ -68,7 +72,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       RexProgramExtractor.extractConjunctiveConditions(
         program,
         call.builder().getRexBuilder,
-        new FunctionCatalog("default_catalog", "default_database"))
+        new FunctionCatalog(catalogManager))
     if (predicates.isEmpty) {
       // no condition can be translated to expression
       return
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 20e8ba6..11e43cf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -207,7 +207,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
       "default_catalog",
       new GenericInMemoryCatalog("default_catalog", "default_database"))
     val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
-    val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+    val functionCatalog = new FunctionCatalog(manager)
     val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
     val jTEnv = new JStreamTableEnvironmentImpl(
       manager,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index b56919d..c917267 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{TableConfig, Types}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition}
 import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
@@ -66,9 +66,13 @@ class AggregateTest extends TableTestBase {
 
   @Test
   def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
-    val functionCatalog = new FunctionCatalog("cat", "db")
+    val defaultCatalog = "default_catalog"
+    val catalogManager = new CatalogManager(
+      defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
+    val functionCatalog = new FunctionCatalog(catalogManager)
     val tablEnv = new StreamTableEnvironmentImpl(
-      Mockito.mock(classOf[CatalogManager]),
+      catalogManager,
       functionCatalog,
       new TableConfig,
       Mockito.mock(classOf[StreamExecutionEnvironment]),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index fde9743..b752b76 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{DateString, TimeString, TimestampString}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
@@ -42,8 +42,7 @@ import scala.collection.mutable
 class RexProgramExtractorTest extends RexProgramTestBase {
 
   private val functionCatalog: FunctionCatalog = new FunctionCatalog(
-    "default_catalog",
-    "default_database")
+    new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")))
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
     new ExpressionBridge[PlannerExpression](
       functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e5810e0..6e0c3a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -329,8 +329,7 @@ case class StreamTableTestUtil(
   private val tableConfig = new TableConfig
   private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
   private val executor: StreamExecutor = new StreamExecutor(javaEnv)
-  private val functionCatalog =
-    new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+  private val functionCatalog = new FunctionCatalog(manager)
   private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
 
   val javaTableEnv = new JavaStreamTableEnvironmentImpl(