You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/19 01:24:25 UTC

[flink] branch master updated: [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b06ac  [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf
d1b06ac is described below

commit d1b06ac4e8ab033c699b656cb0f1d924b6bafc56
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Oct 15 16:21:29 2019 -0700

    [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf
    
    create FunctionDefinitionUtil to instantiate regular java class-based udf and add HiveFunctionDefinitionFactory to instantiate both flink and hive udf
    
    This closes #9908.
---
 .../flink/connectors/hive/HiveTableFactory.java    | 103 +---------------
 .../flink/table/catalog/hive/HiveCatalog.java      |   7 ++
 .../factories/HiveFunctionDefinitionFactory.java}  | 100 ++-------------
 .../table/tests/test_catalog_completeness.py       |   2 +-
 .../flink/table/catalog/FunctionCatalog.java       |  24 ++--
 .../table/functions/FunctionDefinitionUtil.java    |  77 ++++++++++++
 .../functions/FunctionDefinitionUtilTest.java      | 135 +++++++++++++++++++++
 .../org/apache/flink/table/catalog/Catalog.java    |  12 +-
 .../table/factories/FunctionDefinitionFactory.java |   3 +-
 9 files changed, 253 insertions(+), 210 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 235919c..89f76da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -18,29 +18,13 @@
 
 package org.apache.flink.connectors.hive;
 
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.functions.AggregateFunctionDefinition;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
-import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
-import org.apache.flink.table.functions.hive.HiveGenericUDAF;
-import org.apache.flink.table.functions.hive.HiveGenericUDF;
-import org.apache.flink.table.functions.hive.HiveGenericUDTF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDF;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.InputFormatTableSource;
@@ -49,12 +33,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.UDAF;
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,20 +46,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A table factory implementation for Hive catalog.
  */
 public class HiveTableFactory
-		implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+		implements TableSourceFactory<Row>, TableSinkFactory<Row> {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
 
 	private final HiveConf hiveConf;
-	private final String hiveVersion;
-	private final HiveShim hiveShim;
 
 	public HiveTableFactory(HiveConf hiveConf) {
 		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
-
-		// this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
-		this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
-				"Hive version is not defined");
-		this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
 	}
 
 	@Override
@@ -145,76 +116,4 @@ public class HiveTableFactory
 	private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
 		return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
 	}
-
-	@Override
-	public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
-		String functionClassName = catalogFunction.getClassName();
-
-		if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
-			throw new TableException(
-				String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
-		}
-
-		Class clazz;
-		try {
-			clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
-
-			LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);
-		} catch (ClassNotFoundException e) {
-			throw new TableException(
-				String.format("Failed to initiate an instance of class %s.", functionClassName), e);
-		}
-
-		if (UDF.class.isAssignableFrom(clazz)) {
-			LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
-
-			return new ScalarFunctionDefinition(
-				name,
-				new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim)
-			);
-		} else if (GenericUDF.class.isAssignableFrom(clazz)) {
-			LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name);
-
-			return new ScalarFunctionDefinition(
-				name,
-				new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim)
-			);
-		} else if (GenericUDTF.class.isAssignableFrom(clazz)) {
-			LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
-
-			HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-
-			return new TableFunctionDefinition(
-				name,
-				udtf,
-				GenericTypeInfo.of(Row.class)
-			);
-		} else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) {
-			HiveGenericUDAF udaf;
-
-			if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) {
-				LOG.info(
-					"Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s",
-					name, hiveVersion);
-
-				udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
-			} else {
-				LOG.info(
-					"Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s",
-					name, hiveVersion);
-
-				udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
-			}
-
-			return new AggregateFunctionDefinition(
-				name,
-				udaf,
-				GenericTypeInfo.of(Object.class),
-				GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
-			);
-		} else {
-			throw new IllegalArgumentException(
-				String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName));
-		}
-	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7914c89..1e66551 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -56,11 +56,13 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 
@@ -202,6 +204,11 @@ public class HiveCatalog extends AbstractCatalog {
 		return Optional.of(new HiveTableFactory(hiveConf));
 	}
 
+	@Override
+	public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+		return Optional.of(new HiveFunctionDefinitionFactory(hiveConf));
+	}
+
 	// ------ databases ------
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
similarity index 62%
copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 235919c..3e32c7f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -16,24 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connectors.hive;
+package org.apache.flink.table.catalog.hive.factories;
 
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.factories.TableFactoryUtil;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
@@ -41,12 +37,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDAF;
 import org.apache.flink.table.functions.hive.HiveGenericUDF;
 import org.apache.flink.table.functions.hive.HiveGenericUDTF;
 import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.table.sinks.OutputFormatTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.UDAF;
@@ -55,106 +46,35 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A table factory implementation for Hive catalog.
+ * A factory to instantiate Hive UDFs as Flink UDFs.
  */
-public class HiveTableFactory
-		implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
 
-	private final HiveConf hiveConf;
 	private final String hiveVersion;
 	private final HiveShim hiveShim;
 
-	public HiveTableFactory(HiveConf hiveConf) {
-		this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
-
+	public HiveFunctionDefinitionFactory(HiveConf hiveConf) {
 		// this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
 		this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
-				"Hive version is not defined");
+			"Hive version is not defined");
 		this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
 	}
 
 	@Override
-	public Map<String, String> requiredContext() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public List<String> supportedProperties() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public TableSink<Row> createTableSink(Map<String, String> properties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public TableSource<Row> createTableSource(Map<String, String> properties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public TableSource<Row> createTableSource(ObjectPath tablePath, CatalogTable table) {
-		Preconditions.checkNotNull(table);
-		Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
-		boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
-		if (!isGeneric) {
-			return createInputFormatTableSource(tablePath, table);
-		} else {
-			return TableFactoryUtil.findAndCreateTableSource(table);
-		}
-	}
-
-	/**
-	 * Creates and configures a {@link org.apache.flink.table.sources.InputFormatTableSource} using the given {@link CatalogTable}.
-	 */
-	private InputFormatTableSource<Row> createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
-		return new HiveTableSource(new JobConf(hiveConf), tablePath, table);
-	}
-
-	@Override
-	public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) {
-		Preconditions.checkNotNull(table);
-		Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
-		boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
-		if (!isGeneric) {
-			return createOutputFormatTableSink(tablePath, table);
-		} else {
-			return TableFactoryUtil.findAndCreateTableSink(table);
-		}
-	}
-
-	/**
-	 * Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
-	 */
-	private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
-		return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
-	}
-
-	@Override
 	public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
-		String functionClassName = catalogFunction.getClassName();
-
 		if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
-			throw new TableException(
-				String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
+			FunctionDefinitionUtil.createFunctionDefinition(name, catalogFunction);
 		}
 
+		String functionClassName = catalogFunction.getClassName();
+
 		Class clazz;
 		try {
 			clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 9474c30..003c144 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -40,7 +40,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
     @classmethod
     def excluded_methods(cls):
         # open/close are not needed in Python API as they are used internally
-        return {'open', 'close', 'getTableFactory'}
+        return {'open', 'close', 'getTableFactory', 'getFunctionDefinitionFactory'}
 
 
 class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 43280b4..6808216 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableAggregateFunction;
@@ -242,21 +242,17 @@ public class FunctionCatalog implements FunctionLookup {
 			CatalogFunction catalogFunction = catalog.getFunction(
 				new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
 
-			if (catalog.getTableFactory().isPresent() &&
-				catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
-
-				FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
-
-				userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
-
-				return Optional.of(
-					new FunctionLookup.Result(
-						ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
-						userCandidate)
-				);
+			if (catalog.getFunctionDefinitionFactory().isPresent()) {
+				userCandidate = catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName, catalogFunction);
 			} else {
-				// TODO: should go through function definition discover service
+				userCandidate = FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction);
 			}
+
+			return Optional.of(
+				new FunctionLookup.Result(
+					ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
+					userCandidate)
+			);
 		} catch (FunctionNotExistException e) {
 			// Ignore
 		}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
new file mode 100644
index 0000000..98cedaf
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+
+/**
+ * A util to instantiate {@link FunctionDefinition} in the default way.
+ */
+public class FunctionDefinitionUtil {
+
+	public static FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
+		// Currently only handles Java class-based functions
+		Object func;
+		try {
+			func = Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
+		} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+			throw new IllegalStateException(
+				String.format("Failed instantiating '%s'", catalogFunction.getClassName())
+			);
+		}
+
+		UserDefinedFunction udf = (UserDefinedFunction) func;
+
+		if (udf instanceof ScalarFunction) {
+			return new ScalarFunctionDefinition(
+				name,
+				(ScalarFunction) udf
+			);
+		} else if (udf instanceof TableFunction) {
+			TableFunction t = (TableFunction) udf;
+			return new TableFunctionDefinition(
+				name,
+				t,
+				t.getResultType()
+			);
+		} else if (udf instanceof AggregateFunction) {
+			AggregateFunction a = (AggregateFunction) udf;
+
+			return new AggregateFunctionDefinition(
+				name,
+				a,
+				a.getAccumulatorType(),
+				a.getResultType()
+			);
+		} else if (udf instanceof TableAggregateFunction) {
+			TableAggregateFunction a = (TableAggregateFunction) udf;
+
+			return new TableAggregateFunctionDefinition(
+				name,
+				a,
+				a.getAccumulatorType(),
+				a.getResultType()
+			);
+		} else {
+			throw new UnsupportedOperationException(
+				String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", catalogFunction.getClassName())
+			);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
new file mode 100644
index 0000000..84cb81c
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link FunctionDefinitionUtil}.
+ */
+public class FunctionDefinitionUtilTest {
+	@Test
+	public void testScalarFunction() {
+		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+			"test",
+				new CatalogFunctionImpl(TestScalarFunction.class.getName(), Collections.emptyMap())
+		);
+
+		assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() instanceof TestScalarFunction);
+	}
+
+	@Test
+	public void testTableFunction() {
+		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+			"test",
+			new CatalogFunctionImpl(TestTableFunction.class.getName(), Collections.emptyMap())
+		);
+
+		assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction);
+	}
+
+	@Test
+	public void testAggregateFunction() {
+		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+			"test",
+			new CatalogFunctionImpl(TestAggFunction.class.getName(), Collections.emptyMap())
+		);
+
+		assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction);
+	}
+
+	@Test
+	public void testTableAggregateFunction() {
+		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+			"test",
+			new CatalogFunctionImpl(TestTableAggFunction.class.getName(), Collections.emptyMap())
+		);
+
+		assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction);
+	}
+
+	/**
+	 * Test function.
+	 */
+	public static class TestScalarFunction extends ScalarFunction {
+
+	}
+
+	/**
+	 * Test function.
+	 */
+	public static class TestTableFunction extends TableFunction {
+		@Override
+		public TypeInformation getResultType() {
+			return TypeInformation.of(Object.class);
+		}
+	}
+
+	/**
+	 * Test function.
+	 */
+	public static class TestAggFunction extends AggregateFunction {
+		@Override
+		public Object createAccumulator() {
+			return null;
+		}
+
+		@Override
+		public TypeInformation getResultType() {
+			return TypeInformation.of(Object.class);
+		}
+
+		@Override
+		public TypeInformation getAccumulatorType() {
+			return TypeInformation.of(Object.class);
+		}
+
+		@Override
+		public Object getValue(Object accumulator) {
+			return null;
+		}
+	}
+
+	/**
+	 * Test function.
+	 */
+	public static class TestTableAggFunction extends TableAggregateFunction {
+		@Override
+		public Object createAccumulator() {
+			return null;
+		}
+
+		@Override
+		public TypeInformation getResultType() {
+			return TypeInformation.of(Object.class);
+		}
+
+		@Override
+		public TypeInformation getAccumulatorType() {
+			return TypeInformation.of(Object.class);
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 8019ab0..8f210b9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 
 import java.util.List;
@@ -48,7 +49,7 @@ public interface Catalog {
 
 	/**
 	 * Get an optional {@link TableFactory} instance that's responsible for generating table-related
-	 * instances stored in this catalog, instances such as source/sink and function definitions.
+	 * instances stored in this catalog, instances such as source/sink.
 	 *
 	 * @return an optional TableFactory instance
 	 */
@@ -57,6 +58,15 @@ public interface Catalog {
 	}
 
 	/**
+	 * Get an optional {@link FunctionDefinitionFactory} instance that's responsible for instantiating function definitions.
+	 *
+	 * @return an optional FunctionDefinitionFactory instance
+	 */
+	default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+		return Optional.empty();
+	}
+
+	/**
 	 * Open the catalog. Used for any required preparation in initialization phase.
 	 *
 	 * @throws CatalogException in case of any runtime exception
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
index 2e8c538..1e4d76b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
 
 /**
  * A factory to create {@link FunctionDefinition}.
- * See also {@link TableFactory} for more information.
  */
-public interface FunctionDefinitionFactory extends TableFactory {
+public interface FunctionDefinitionFactory {
 
 	/**
 	 * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.