You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/15 21:04:54 UTC
[flink] branch master updated: [FLINK-13024][table] integrate
FunctionCatalog with CatalogManager
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d71c4ee [FLINK-13024][table] integrate FunctionCatalog with CatalogManager
d71c4ee is described below
commit d71c4eed25cbd7d183cafb29e8bfbb17df904cbd
Author: bowen.li <bo...@gmail.com>
AuthorDate: Fri Jul 5 15:37:37 2019 -0700
[FLINK-13024][table] integrate FunctionCatalog with CatalogManager
This PR integrates FunctionCatalog with Catalog APIs.
This closes #8920.
---
.../batch/connectors/hive/HiveTableFactory.java | 106 ++++++++++++++++++-
.../java/internal/StreamTableEnvironmentImpl.java | 5 +-
.../table/api/internal/TableEnvironmentImpl.java | 5 +-
.../apache/flink/table/catalog/CatalogManager.java | 31 ++++--
.../flink/table/catalog/FunctionCatalog.java | 114 +++++++++++++++------
.../internal/StreamTableEnvironmentImpl.scala | 5 +-
.../org/apache/flink/table/catalog/Catalog.java | 4 +-
.../table/factories/FunctionDefinitionFactory.java | 38 +++++++
.../metadata/AggCallSelectivityEstimatorTest.scala | 7 +-
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 11 +-
.../plan/metadata/SelectivityEstimatorTest.scala | 7 +-
.../table/plan/util/RexNodeExtractorTest.scala | 17 +--
.../apache/flink/table/util/TableTestBase.scala | 3 +-
.../flink/table/api/internal/TableEnvImpl.scala | 5 +-
.../PushFilterIntoTableSourceScanRule.scala | 8 +-
.../api/stream/StreamTableEnvironmentTest.scala | 2 +-
.../flink/table/api/stream/sql/AggregateTest.scala | 10 +-
.../flink/table/plan/RexProgramExtractorTest.scala | 5 +-
.../apache/flink/table/utils/TableTestBase.scala | 3 +-
19 files changed, 296 insertions(+), 90 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index a22014a..9c8beaa 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -18,13 +18,28 @@
package org.apache.flink.batch.connectors.hive;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.functions.AggregateFunctionDefinition;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.functions.TableFunctionDefinition;
+import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
+import org.apache.flink.table.functions.hive.HiveGenericUDAF;
+import org.apache.flink.table.functions.hive.HiveGenericUDF;
+import org.apache.flink.table.functions.hive.HiveGenericUDTF;
+import org.apache.flink.table.functions.hive.HiveSimpleUDF;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
@@ -33,7 +48,15 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -41,13 +64,19 @@ import java.util.Map;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A table factory implementation for tables stored in Hive catalog.
+ * A table factory implementation for Hive catalog.
*/
-public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
- private HiveConf hiveConf;
+public class HiveTableFactory
+ implements TableSourceFactory<Row>, TableSinkFactory<Row>, FunctionDefinitionFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTableFactory.class);
+
+ private final HiveConf hiveConf;
+ private final String hiveVersion;
public HiveTableFactory(HiveConf hiveConf) {
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
+
+ this.hiveVersion = new JobConf(hiveConf).get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
}
@Override
@@ -112,4 +141,75 @@ public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFacto
return new HiveTableSink(new JobConf(hiveConf), tablePath, table);
}
+ @Override
+ public FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction) {
+ String functionClassName = catalogFunction.getClassName();
+
+ if (Boolean.valueOf(catalogFunction.getProperties().get(CatalogConfig.IS_GENERIC))) {
+ throw new TableException(
+ String.format("HiveFunctionDefinitionFactory does not support generic functions %s yet", name));
+ }
+
+ Class clazz;
+ try {
+ clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
+
+ LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);
+ } catch (ClassNotFoundException e) {
+ throw new TableException(
+ String.format("Failed to initiate an instance of class %s.", functionClassName), e);
+ }
+
+ if (UDF.class.isAssignableFrom(clazz)) {
+ LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
+
+ return new ScalarFunctionDefinition(
+ name,
+ new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName))
+ );
+ } else if (GenericUDF.class.isAssignableFrom(clazz)) {
+ LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name);
+
+ return new ScalarFunctionDefinition(
+ name,
+ new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName))
+ );
+ } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
+ LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
+
+ HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName));
+
+ return new TableFunctionDefinition(
+ name,
+ udtf,
+ GenericTypeInfo.of(Row.class)
+ );
+ } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz) || UDAF.class.isAssignableFrom(clazz)) {
+ HiveGenericUDAF udaf;
+
+ if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) {
+ LOG.info(
+ "Transforming Hive function '{}' into a HiveGenericUDAF with no UDAF bridging and Hive version %s",
+ name, hiveVersion);
+
+ udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), false, hiveVersion);
+ } else {
+ LOG.info(
+ "Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging and Hive version %s",
+ name, hiveVersion);
+
+ udaf = new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClassName), true, hiveVersion);
+ }
+
+ return new AggregateFunctionDefinition(
+ name,
+ udaf,
+ GenericTypeInfo.of(Object.class),
+ GenericTypeInfo.of(GenericUDAFEvaluator.AggregationBuffer.class)
+ );
+ } else {
+ throw new IllegalArgumentException(
+ String.format("HiveFunctionDefinitionFactory cannot initiate FunctionDefinition for class %s", functionClassName));
+ }
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 86cd203..4d0586c 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -102,13 +102,12 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
EnvironmentSettings settings,
TableConfig tableConfig) {
- FunctionCatalog functionCatalog = new FunctionCatalog(
- settings.getBuiltInCatalogName(),
- settings.getBuiltInDatabaseName());
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+ FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 988fa88..f2aea62 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -123,13 +123,12 @@ public class TableEnvironmentImpl implements TableEnvironment {
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
- FunctionCatalog functionCatalog = new FunctionCatalog(
- settings.getBuiltInCatalogName(),
- settings.getBuiltInDatabaseName());
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+ FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
+
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 0f83176..c5d0bc7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -59,11 +59,14 @@ public class CatalogManager {
// TO BE REMOVED along with ExternalCatalog API
private Map<String, ExternalCatalog> externalCatalogs;
- // The name of the default catalog and schema
+ // The name of the current catalog and database
private String currentCatalogName;
private String currentDatabaseName;
+ // The name of the default catalog
+ private final String defaultCatalogName;
+
/**
* Temporary solution to handle both {@link CatalogBaseTable} and
* {@link ExternalCatalogTable} in a single call.
@@ -125,6 +128,7 @@ public class CatalogManager {
catalogs.put(defaultCatalogName, defaultCatalog);
this.currentCatalogName = defaultCatalogName;
this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+ this.defaultCatalogName = defaultCatalogName;
}
/**
@@ -215,9 +219,9 @@ public class CatalogManager {
}
/**
- * Gets the current default catalog that will be used when resolving table path.
+ * Gets the current catalog that will be used when resolving table path.
*
- * @return the current default catalog
+ * @return the current catalog
* @see CatalogManager#resolveTable(String...)
*/
public String getCurrentCatalog() {
@@ -225,9 +229,9 @@ public class CatalogManager {
}
/**
- * Sets the current default catalog name that will be used when resolving table path.
+ * Sets the current catalog name that will be used when resolving table path.
*
- * @param catalogName catalog name to set as current default catalog
+ * @param catalogName catalog name to set as current catalog
* @throws CatalogNotExistException thrown if the catalog doesn't exist
* @see CatalogManager#resolveTable(String...)
*/
@@ -255,9 +259,9 @@ public class CatalogManager {
}
/**
- * Gets the current default database name that will be used when resolving table path.
+ * Gets the current database name that will be used when resolving table path.
*
- * @return the current default database
+ * @return the current database
* @see CatalogManager#resolveTable(String...)
*/
public String getCurrentDatabase() {
@@ -265,10 +269,10 @@ public class CatalogManager {
}
/**
- * Sets the current default database name that will be used when resolving a table path.
+ * Sets the current database name that will be used when resolving a table path.
* The database has to exist in the current catalog.
*
- * @param databaseName database name to set as current default database name
+ * @param databaseName database name to set as current database name
* @throws CatalogException thrown if the database doesn't exist in the current catalog
* @see CatalogManager#resolveTable(String...)
* @see CatalogManager#setCurrentCatalog(String)
@@ -294,6 +298,15 @@ public class CatalogManager {
}
/**
+ * Gets the default catalog name.
+ *
+ * @return the default catalog
+ */
+ public String getDefaultCatalogName() {
+ return defaultCatalogName;
+ }
+
+ /**
* Tries to resolve a table path to a {@link ResolvedTable}. The algorithm looks for requested table
* in the following paths in that order:
* <ol>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 0d18ffe..af7f21f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -36,21 +39,26 @@ import org.apache.flink.table.functions.UserDefinedAggregateFunction;
import org.apache.flink.table.functions.UserFunctionsTypeHelper;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Simple function catalog to store {@link FunctionDefinition}s in memory.
+ * Simple function catalog to store {@link FunctionDefinition}s in catalogs.
*/
@Internal
public class FunctionCatalog implements FunctionLookup {
- private final String defaultCatalogName;
-
- private final String defaultDatabaseName;
+ private final CatalogManager catalogManager;
+ // For simplicity, currently hold registered Flink functions in memory here
+ // TODO: should move to catalog
private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>();
/**
@@ -58,11 +66,8 @@ public class FunctionCatalog implements FunctionLookup {
*/
private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
- public FunctionCatalog(
- String defaultCatalogName,
- String defaultDatabaseName) {
- this.defaultCatalogName = defaultCatalogName;
- this.defaultDatabaseName = defaultDatabaseName;
+ public FunctionCatalog(CatalogManager catalogManager) {
+ this.catalogManager = checkNotNull(catalogManager);
}
public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
@@ -129,35 +134,81 @@ public class FunctionCatalog implements FunctionLookup {
}
public String[] getUserDefinedFunctions() {
- return userFunctions.values().stream()
- .map(FunctionDefinition::toString)
- .toArray(String[]::new);
+ List<String> result = new ArrayList<>();
+
+ // Get functions in catalog
+ Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+ try {
+ result.addAll(catalog.listFunctions(catalogManager.getCurrentDatabase()));
+ } catch (DatabaseNotExistException e) {
+ // Ignore since there will always be a current database of the current catalog
+ }
+
+ // Get functions registered in memory
+ result.addAll(
+ userFunctions.values().stream()
+ .map(FunctionDefinition::toString)
+ .collect(Collectors.toList()));
+
+ return result.stream()
+ .collect(Collectors.toList())
+ .toArray(new String[0]);
}
@Override
public Optional<FunctionLookup.Result> lookupFunction(String name) {
- final FunctionDefinition userCandidate = userFunctions.get(normalizeName(name));
- final Optional<FunctionDefinition> foundDefinition;
- if (userCandidate != null) {
- foundDefinition = Optional.of(userCandidate);
- } else {
+ String functionName = normalizeName(name);
- // TODO once we connect this class with the Catalog APIs we need to make sure that
- // built-in functions are present in "root" built-in catalog. This allows to
- // overwrite built-in functions but also fallback to the "root" catalog. It should be
- // possible to disable the "root" catalog if that is desired.
+ FunctionDefinition userCandidate = null;
- foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
- .stream()
- .filter(f -> normalizeName(name).equals(normalizeName(f.getName())))
- .findFirst()
- .map(Function.identity());
- }
+ Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
- return foundDefinition.map(definition -> new FunctionLookup.Result(
- ObjectIdentifier.of(defaultCatalogName, defaultDatabaseName, name),
- definition)
- );
+ if (catalog.getTableFactory().isPresent() &&
+ catalog.getTableFactory().get() instanceof FunctionDefinitionFactory) {
+ try {
+ CatalogFunction catalogFunction = catalog.getFunction(
+ new ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+ FunctionDefinitionFactory factory = (FunctionDefinitionFactory) catalog.getTableFactory().get();
+
+ userCandidate = factory.createFunctionDefinition(functionName, catalogFunction);
+ } catch (FunctionNotExistException e) {
+ // Ignore
+ }
+
+ return Optional.of(
+ new FunctionLookup.Result(
+ ObjectIdentifier.of(catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase(), name),
+ userCandidate)
+ );
+ } else {
+ // Else, check in-memory functions
+ userCandidate = userFunctions.get(functionName);
+
+ final Optional<FunctionDefinition> foundDefinition;
+ if (userCandidate != null) {
+ foundDefinition = Optional.of(userCandidate);
+ } else {
+
+ // TODO once we connect this class with the Catalog APIs we need to make sure that
+ // built-in functions are present in "root" built-in catalog. This allows to
+ // overwrite built-in functions but also fallback to the "root" catalog. It should be
+ // possible to disable the "root" catalog if that is desired.
+
+ foundDefinition = BuiltInFunctionDefinitions.getDefinitions()
+ .stream()
+ .filter(f -> functionName.equals(normalizeName(f.getName())))
+ .findFirst()
+ .map(Function.identity());
+ }
+
+ String defaultCatalogName = catalogManager.getDefaultCatalogName();
+
+ return foundDefinition.map(definition -> new FunctionLookup.Result(
+ ObjectIdentifier.of(defaultCatalogName, catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+ definition)
+ );
+ }
}
@Override
@@ -169,6 +220,7 @@ public class FunctionCatalog implements FunctionLookup {
}
private void registerFunction(String name, FunctionDefinition functionDefinition) {
+ // TODO: should register to catalog
userFunctions.put(normalizeName(name), functionDefinition);
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c2b9831..1304c8c 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -275,13 +275,12 @@ object StreamTableEnvironmentImpl {
tableConfig: TableConfig)
: StreamTableEnvironmentImpl = {
- val functionCatalog = new FunctionCatalog(
- settings.getBuiltInCatalogName,
- settings.getBuiltInDatabaseName)
val catalogManager = new CatalogManager(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+ val functionCatalog = new FunctionCatalog(catalogManager)
+
val executorProperties = settings.toExecutorProperties
val executor = lookupExecutor(executorProperties, executionEnvironment)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index dbb5c1b..8019ab0 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -47,8 +47,8 @@ import java.util.Optional;
public interface Catalog {
/**
- * Get an optional {@link TableFactory} instance that's responsible for generating source/sink for tables
- * stored in this catalog.
+ * Get an optional {@link TableFactory} instance that's responsible for generating table-related
+ * instances stored in this catalog, instances such as source/sink and function definitions.
*
* @return an optional TableFactory instance
*/
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
new file mode 100644
index 0000000..2e8c538
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.functions.FunctionDefinition;
+
+/**
+ * A factory to create {@link FunctionDefinition}.
+ * See also {@link TableFactory} for more information.
+ */
+public interface FunctionDefinitionFactory extends TableFactory {
+
+ /**
+ * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.
+ *
+ * @param name name of the {@link CatalogFunction}
+ * @param catalogFunction the catalog function
+ * @return a {@link FunctionDefinition}
+ */
+ FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction);
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
index ea14fa9..b52d9e3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/AggCallSelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
import org.apache.flink.table.{JDouble, JLong}
import org.apache.flink.util.Preconditions
-
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
@@ -45,7 +44,6 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
-
import java.math.BigDecimal
import scala.collection.JavaConversions._
@@ -81,7 +79,8 @@ class AggCallSelectivityEstimatorTest {
val tableScan = mock(classOf[TableScan])
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
- val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+ val catalogManager = mock(classOf[CatalogManager])
+ val functionCatalog = new FunctionCatalog(catalogManager)
val context = new FlinkContextImpl(new TableConfig, functionCatalog)
when(tableScan, "getCluster").thenReturn(cluster)
when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 8d7c348..c1627a6 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction
@@ -44,7 +44,6 @@ import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, Variabl
import org.apache.flink.table.types.AtomicDataType
import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LogicalType, TimestampKind, TimestampType, VarCharType}
import org.apache.flink.table.util.CountAggFunction
-
import com.google.common.collect.{ImmutableList, Lists}
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan._
@@ -64,7 +63,6 @@ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
import org.junit.{Before, BeforeClass}
-
import java.math.BigDecimal
import java.util
@@ -74,12 +72,17 @@ class FlinkRelMdHandlerTestBase {
val tableConfig = new TableConfig()
val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
+
+ val defaultCatalog = "default_catalog"
+ val catalogManager = new CatalogManager(
+ defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
// TODO batch RelNode and stream RelNode should have different PlannerContext
// and RelOptCluster due to they have different trait definitions.
val plannerContext: PlannerContext =
new PlannerContext(
tableConfig,
- new FunctionCatalog("default_catalog", "default_database"),
+ new FunctionCatalog(catalogManager),
CalciteSchema.from(rootSchema),
util.Arrays.asList(
ConventionTraitDef.INSTANCE,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
index 8494ae1..2259d94 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/SelectivityEstimatorTest.scala
@@ -20,12 +20,11 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.{FlinkContext, FlinkContextImpl, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats}
import org.apache.flink.table.{JDouble, JLong}
import org.apache.flink.util.Preconditions
-
import org.apache.calcite.plan.{AbstractRelOptPlanner, RelOptCluster}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
@@ -42,7 +41,6 @@ import org.junit.{Before, BeforeClass, Test}
import org.powermock.api.mockito.PowerMockito._
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
-
import java.math.BigDecimal
import scala.collection.JavaConverters._
@@ -83,7 +81,8 @@ class SelectivityEstimatorTest {
val tableScan = mock(classOf[TableScan])
val cluster = mock(classOf[RelOptCluster])
val planner = mock(classOf[AbstractRelOptPlanner])
- val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+ val catalogManager = mock(classOf[CatalogManager])
+ val functionCatalog = new FunctionCatalog(catalogManager)
val context: FlinkContext = new FlinkContextImpl(tableConfig, functionCatalog)
when(tableScan, "getCluster").thenReturn(cluster)
when(cluster, "getRexBuilder").thenReturn(rexBuilder)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
index c963b1f..f8af378 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/util/RexNodeExtractorTest.scala
@@ -18,9 +18,6 @@
package org.apache.flink.table.plan.util
-import org.apache.flink.table.api.{DataTypes, Types}
-import org.apache.flink.table.catalog.FunctionCatalog
-import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, unresolvedRef, valueLiteral}
import org.apache.flink.table.expressions.utils.Func1
import org.apache.flink.table.functions.AggregateFunctionDefinition
@@ -29,7 +26,6 @@ import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.functions.utils.ScalarSqlFunction
import org.apache.flink.table.plan.util.InputTypeBuilder.inputOf
import org.apache.flink.table.util.{DateTimeTestUtil, IntSumAggFunction}
-
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.SqlPostfixOperator
@@ -40,21 +36,26 @@ import org.apache.calcite.util.{DateString, TimeString, TimestampString}
import org.hamcrest.CoreMatchers.is
import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue}
import org.junit.Test
-
import java.math.BigDecimal
import java.sql.Timestamp
-import java.util.{List => JList}
-import java.sql.{Date, Time, Timestamp}
import java.util.{TimeZone, List => JList}
+import org.apache.flink.api.common.typeinfo.Types
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.expressions.{EqualTo, Expression, ExpressionBridge, ExpressionParser, GreaterThan, Literal, PlannerExpression, PlannerExpressionConverter, Sum, UnresolvedFieldReference}
+
import scala.collection.JavaConverters._
/**
* Test for [[RexNodeExtractor]].
*/
class RexNodeExtractorTest extends RexNodeTestBase {
+ val defaultCatalog = "default_catalog"
+ val catalogManager = new CatalogManager(
+ defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
- private val functionCatalog = new FunctionCatalog("default_catalog", "default_database")
+ private val functionCatalog = new FunctionCatalog(catalogManager)
private val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 26a5268..38b5d12 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -985,8 +985,6 @@ object TestingTableEnvironment {
def create(
settings: EnvironmentSettings,
catalogManager: Option[CatalogManager] = None): TestingTableEnvironment = {
- val functionCatalog = new FunctionCatalog(
- settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)
val catalogMgr = catalogManager match {
case Some(c) => c
case _ =>
@@ -994,6 +992,7 @@ object TestingTableEnvironment {
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
}
+ val functionCatalog = new FunctionCatalog(catalogMgr)
val plannerProperties = settings.toPlannerProperties
val executorProperties = settings.toExecutorProperties
val executor = ComponentFactoryService.find(classOf[ExecutorFactory],
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 13d6c53..3452af6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -64,9 +64,8 @@ abstract class TableEnvImpl(
protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
// Table API/SQL function catalog
- private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
- builtinCatalogName,
- builtinDatabaseName)
+ private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(catalogManager)
+
// temporary utility until we don't use planner expressions anymore
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index 0dc9805..9c16135 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -23,7 +23,7 @@ import java.util
import org.apache.calcite.plan.RelOptRule.{none, operand}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rex.RexProgram
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.expressions.{Expression, PlannerExpression}
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan}
import org.apache.flink.table.plan.util.RexProgramExtractor
@@ -37,6 +37,10 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
operand(classOf[FlinkLogicalTableSourceScan], none)),
"PushFilterIntoTableSourceScanRule") {
+ private val defaultCatalog = "default_catalog"
+ private val catalogManager = new CatalogManager(
+ defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
override def matches(call: RelOptRuleCall): Boolean = {
val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
@@ -68,7 +72,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
RexProgramExtractor.extractConjunctiveConditions(
program,
call.builder().getRexBuilder,
- new FunctionCatalog("default_catalog", "default_database"))
+ new FunctionCatalog(catalogManager))
if (predicates.isEmpty) {
// no condition can be translated to expression
return
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 20e8ba6..11e43cf 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -207,7 +207,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
"default_catalog",
new GenericInMemoryCatalog("default_catalog", "default_database"))
val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
- val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+ val functionCatalog = new FunctionCatalog(manager)
val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
val jTEnv = new JStreamTableEnvironmentImpl(
manager,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
index b56919d..c917267 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/AggregateTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.api.{TableConfig, Types}
-import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.delegation.{Executor, Planner}
import org.apache.flink.table.functions.{AggregateFunction, AggregateFunctionDefinition}
import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
@@ -66,9 +66,13 @@ class AggregateTest extends TableTestBase {
@Test
def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
- val functionCatalog = new FunctionCatalog("cat", "db")
+ val defaultCatalog = "default_catalog"
+ val catalogManager = new CatalogManager(
+ defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, "default_database"))
+
+ val functionCatalog = new FunctionCatalog(catalogManager)
val tablEnv = new StreamTableEnvironmentImpl(
- Mockito.mock(classOf[CatalogManager]),
+ catalogManager,
functionCatalog,
new TableConfig,
Mockito.mock(classOf[StreamExecutionEnvironment]),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index fde9743..b752b76 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{DateString, TimeString, TimestampString}
-import org.apache.flink.table.catalog.FunctionCatalog
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
import org.apache.flink.table.utils.InputTypeBuilder.inputOf
@@ -42,8 +42,7 @@ import scala.collection.mutable
class RexProgramExtractorTest extends RexProgramTestBase {
private val functionCatalog: FunctionCatalog = new FunctionCatalog(
- "default_catalog",
- "default_database")
+ new CatalogManager("default_catalog", new GenericInMemoryCatalog("default_catalog")))
private val expressionBridge: ExpressionBridge[PlannerExpression] =
new ExpressionBridge[PlannerExpression](
functionCatalog,
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index e5810e0..6e0c3a3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -329,8 +329,7 @@ case class StreamTableTestUtil(
private val tableConfig = new TableConfig
private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
private val executor: StreamExecutor = new StreamExecutor(javaEnv)
- private val functionCatalog =
- new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+ private val functionCatalog = new FunctionCatalog(manager)
private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
val javaTableEnv = new JavaStreamTableEnvironmentImpl(