You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/19 01:24:25 UTC
[flink] branch master updated: [FLINK-14401][table][hive] create
DefaultFunctionDefinitionFactory to instantiate regular java class-based
udf
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1b06ac [FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf
d1b06ac is described below
commit d1b06ac4e8ab033c699b656cb0f1d924b6bafc56
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Oct 15 16:21:29 2019 -0700
[FLINK-14401][table][hive] create DefaultFunctionDefinitionFactory to instantiate regular java class-based udf
create FunctionDefinitionUtil to instantiate regular java class-based udf and add HiveFunctionDefinitionFactory to instantiate both flink and hive udf
This closes #9908.
---
.../flink/connectors/hive/HiveTableFactory.java | 103 +---------------
.../flink/table/catalog/hive/HiveCatalog.java | 7 ++
.../factories/HiveFunctionDefinitionFactory.java} | 100 ++-------------
.../table/tests/test_catalog_completeness.py | 2 +-
.../flink/table/catalog/FunctionCatalog.java | 24 ++--
.../table/functions/FunctionDefinitionUtil.java | 77 ++++++++++++
.../functions/FunctionDefinitionUtilTest.java | 135 +++++++++++++++++++++
.../org/apache/flink/table/catalog/Catalog.java | 12 +-
.../table/factories/FunctionDefinitionFactory.java | 3 +-
9 files changed, 253 insertions(+), 210 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 235919c..89f76da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -18,29 +18,13 @@
package org.apache.flink.connectors.hive;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
-import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.functions.AggregateFunctionDefinition;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
-import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
-import org.apache.flink.table.functions.hive.HiveGenericUDAF;
-import org.apache.flink.table.functions.hive.HiveGenericUDF;
-import org.apache.flink.table.functions.hive.HiveGenericUDTF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDF;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
@@ -49,12 +33,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.UDAF;
-import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,20 +46,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A table factory implementation for Hive catalog.
*/
public class HiveTableFactory
- implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+ implements TableSourceFactory<Row>, TableSinkFactory<Row> {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
private final HiveConf hiveConf;
- private final String hiveVersion;
- private final HiveShim hiveShim;
public HiveTableFactory(HiveConf hiveConf) {
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
-
- // this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
- this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
- "Hive version is not defined");
- this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
}
@Override
@@ -145,76 +116,4 @@ public class HiveTableFactory
private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
}
-
- @Override
- public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
- String functionClassName = catalogFunction.getClassName();
-
- if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
- throw new TableException(
- String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
- }
-
- Class clazz;
- try {
- clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
-
- LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);
- } catch (ClassNotFoundException e) {
- throw new TableException(
- String.format("Failed to initiate an instance of class %s.", functionClassName), e);
- }
-
- if (UDF.class.isAssignableFrom(clazz)) {
- LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
-
- return new ScalarFunctionDefinition(
- name,
- new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim)
- );
- } else if (GenericUDF.class.isAssignableFrom(clazz)) {
- LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name);
-
- return new ScalarFunctionDefinition(
- name,
- new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim)
- );
- } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
- LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
-
- HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-
- return new TableFunctionDefinition(
- name,
- udtf,
- GenericTypeInfo.of(Row.class)
- );
- } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) {
- HiveGenericUDAF udaf;
-
- if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) {
- LOG.info(
- "Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s",
- name, hiveVersion);
-
- udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
- } else {
- LOG.info(
- "Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s",
- name, hiveVersion);
-
- udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
- }
-
- return new AggregateFunctionDefinition(
- name,
- udaf,
- GenericTypeInfo.of(Object.class),
- GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
- );
- } else {
- throw new IllegalArgumentException(
- String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName));
- }
- }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 7914c89..1e66551 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -56,11 +56,13 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
@@ -202,6 +204,11 @@ public class HiveCatalog extends AbstractCatalog {
return Optional.of(new HiveTableFactory(hiveConf));
}
+ @Override
+ public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+ return Optional.of(new HiveFunctionDefinitionFactory(hiveConf));
+ }
+
// ------ databases ------
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
similarity index 62%
copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 235919c..3e32c7f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -16,24 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.connectors.hive;
+package org.apache.flink.table.catalog.hive.factories;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.factories.TableFactoryUtil;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
@@ -41,12 +37,7 @@ import org.apache.flink.table.functions.hive.HiveGenericUDAF;
import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.functions.hive.HiveGenericUDTF;
import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.table.sinks.OutputFormatTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.InputFormatTableSource;
-import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.UDAF;
@@ -55,106 +46,35 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Map;
-
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A table factory implementation for Hive catalog.
+ * A factory to instantiate Hive UDFs as Flink UDFs.
*/
-public class HiveTableFactory
- implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
- private final HiveConf hiveConf;
private final String hiveVersion;
private final HiveShim hiveShim;
- public HiveTableFactory(HiveConf hiveConf) {
- this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
-
+ public HiveFunctionDefinitionFactory(HiveConf hiveConf) {
// this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file
this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
- "Hive version is not defined");
+ "Hive version is not defined");
this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
}
@Override
- public Map<String, String> requiredContext() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<String> supportedProperties() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableSink<Row> createTableSink(Map<String, String> properties) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableSource<Row> createTableSource(Map<String, String> properties) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TableSource<Row> createTableSource(ObjectPath tablePath, CatalogTable table) {
- Preconditions.checkNotNull(table);
- Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
- boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
- if (!isGeneric) {
- return createInputFormatTableSource(tablePath, table);
- } else {
- return TableFactoryUtil.findAndCreateTableSource(table);
- }
- }
-
- /**
- * Creates and configures a {@link org.apache.flink.table.sources.InputFormatTableSource} using the given {@link CatalogTable}.
- */
- private InputFormatTableSource<Row> createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
- return new HiveTableSource(new JobConf(hiveConf), tablePath, table);
- }
-
- @Override
- public TableSink<Row> createTableSink(ObjectPath tablePath, CatalogTable table) {
- Preconditions.checkNotNull(table);
- Preconditions.checkArgument(table instanceof CatalogTableImpl);
-
- boolean isGeneric = Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
-
- if (!isGeneric) {
- return createOutputFormatTableSink(tablePath, table);
- } else {
- return TableFactoryUtil.findAndCreateTableSink(table);
- }
- }
-
- /**
- * Creates and configures a {@link org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link CatalogTable}.
- */
- private OutputFormatTableSink<Row> createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
- return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
- }
-
- @Override
public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
- String functionClassName = catalogFunction.getClassName();
-
if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
- throw new TableException(
- String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
+ FunctionDefinitionUtil.createFunctionDefinition(name, catalogFunction);
}
+ String functionClassName = catalogFunction.getClassName();
+
Class clazz;
try {
clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 9474c30..003c144 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -40,7 +40,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
@classmethod
def excluded_methods(cls):
# open/close are not needed in Python API as they are used internally
- return {'open', 'close', 'getTableFactory'}
+ return {'open', 'close', 'getTableFactory', 'getFunctionDefinitionFactory'}
class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase):
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 43280b4..6808216 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -25,11 +25,11 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
-import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinitionUtil;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableAggregateFunction;
@@ -242,21 +242,17 @@ public class FunctionCatalog implements FunctionLookup {
CatalogFunction catalogFunction = catalog.getFunction(
new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
- if (catalog.getTableFactory().isPresent() &&
- catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
-
- FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
-
- userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
-
- return Optional.of(
- new FunctionLookup.Result(
- ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
- userCandidate)
- );
+ if (catalog.getFunctionDefinitionFactory().isPresent()) {
+ userCandidate = catalog.getFunctionDefinitionFactory().get().createFunctionDefinition(functionName, catalogFunction);
} else {
- // TODO: should go through function definition discover service
+ userCandidate = FunctionDefinitionUtil.createFunctionDefinition(functionName, catalogFunction);
}
+
+ return Optional.of(
+ new FunctionLookup.Result(
+ ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
+ userCandidate)
+ );
} catch (FunctionNotExistException e) {
// Ignore
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
new file mode 100644
index 0000000..98cedaf
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+
+/**
+ * A util to instantiate {@link FunctionDefinition} in the default way.
+ */
+public class FunctionDefinitionUtil {
+
+ public static FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
+ // Currently only handles Java class-based functions
+ Object func;
+ try {
+ func = Thread.currentThread().getContextClassLoader().loadClass(catalogFunction.getClassName()).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new IllegalStateException(
+ String.format("Failed instantiating '%s'", catalogFunction.getClassName())
+ );
+ }
+
+ UserDefinedFunction udf = (UserDefinedFunction) func;
+
+ if (udf instanceof ScalarFunction) {
+ return new ScalarFunctionDefinition(
+ name,
+ (ScalarFunction) udf
+ );
+ } else if (udf instanceof TableFunction) {
+ TableFunction t = (TableFunction) udf;
+ return new TableFunctionDefinition(
+ name,
+ t,
+ t.getResultType()
+ );
+ } else if (udf instanceof AggregateFunction) {
+ AggregateFunction a = (AggregateFunction) udf;
+
+ return new AggregateFunctionDefinition(
+ name,
+ a,
+ a.getAccumulatorType(),
+ a.getResultType()
+ );
+ } else if (udf instanceof TableAggregateFunction) {
+ TableAggregateFunction a = (TableAggregateFunction) udf;
+
+ return new TableAggregateFunctionDefinition(
+ name,
+ a,
+ a.getAccumulatorType(),
+ a.getResultType()
+ );
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Function %s should be of ScalarFunction, TableFunction, AggregateFunction, or TableAggregateFunction", catalogFunction.getClassName())
+ );
+ }
+ }
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
new file mode 100644
index 0000000..84cb81c
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link FunctionDefinitionUtil}.
+ */
+public class FunctionDefinitionUtilTest {
+ @Test
+ public void testScalarFunction() {
+ FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ new CatalogFunctionImpl(TestScalarFunction.class.getName(), Collections.emptyMap())
+ );
+
+ assertTrue(((ScalarFunctionDefinition) fd).getScalarFunction() instanceof TestScalarFunction);
+ }
+
+ @Test
+ public void testTableFunction() {
+ FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ new CatalogFunctionImpl(TestTableFunction.class.getName(), Collections.emptyMap())
+ );
+
+ assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction);
+ }
+
+ @Test
+ public void testAggregateFunction() {
+ FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ new CatalogFunctionImpl(TestAggFunction.class.getName(), Collections.emptyMap())
+ );
+
+ assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction);
+ }
+
+ @Test
+ public void testTableAggregateFunction() {
+ FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ new CatalogFunctionImpl(TestTableAggFunction.class.getName(), Collections.emptyMap())
+ );
+
+ assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction);
+ }
+
+ /**
+ * Test function.
+ */
+ public static class TestScalarFunction extends ScalarFunction {
+
+ }
+
+ /**
+ * Test function.
+ */
+ public static class TestTableFunction extends TableFunction {
+ @Override
+ public TypeInformation getResultType() {
+ return TypeInformation.of(Object.class);
+ }
+ }
+
+ /**
+ * Test function.
+ */
+ public static class TestAggFunction extends AggregateFunction {
+ @Override
+ public Object createAccumulator() {
+ return null;
+ }
+
+ @Override
+ public TypeInformation getResultType() {
+ return TypeInformation.of(Object.class);
+ }
+
+ @Override
+ public TypeInformation getAccumulatorType() {
+ return TypeInformation.of(Object.class);
+ }
+
+ @Override
+ public Object getValue(Object accumulator) {
+ return null;
+ }
+ }
+
+ /**
+ * Test function.
+ */
+ public static class TestTableAggFunction extends TableAggregateFunction {
+ @Override
+ public Object createAccumulator() {
+ return null;
+ }
+
+ @Override
+ public TypeInformation getResultType() {
+ return TypeInformation.of(Object.class);
+ }
+
+ @Override
+ public TypeInformation getAccumulatorType() {
+ return TypeInformation.of(Object.class);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 8019ab0..8f210b9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import java.util.List;
@@ -48,7 +49,7 @@ public interface Catalog {
/**
* Get an optional {@link TableFactory} instance that's responsible for generating table-related
- * instances stored in this catalog, instances such as source/sink and function definitions.
+ * instances stored in this catalog, instances such as source/sink.
*
* @return an optional TableFactory instance
*/
@@ -57,6 +58,15 @@ public interface Catalog {
}
/**
+ * Get an optional {@link FunctionDefinitionFactory} instance that's responsible for instantiating function definitions.
+ *
+ * @return an optional FunctionDefinitionFactory instance
+ */
+ default Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+ return Optional.empty();
+ }
+
+ /**
* Open the catalog. Used for any required preparation in initialization phase.
*
* @throws CatalogException in case of any runtime exception
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
index 2e8c538..1e4d76b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.table.functions.FunctionDefinition;
/**
* A factory to create {@link FunctionDefinition}.
- * See also {@link TableFactory} for more information.
*/
-public interface FunctionDefinitionFactory extends TableFactory {
+public interface FunctionDefinitionFactory {
/**
* Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.