You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/15 21:04:47 UTC
[flink] branch master updated: [FLINK-14216][table] introduce temp
system functions and temp functions to FunctionCatalog
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b24e4bc [FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog
b24e4bc is described below
commit b24e4bcb4cca7e4a9d14ae3b3fff2d2678517863
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Sep 30 14:44:49 2019 -0700
[FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog
adapt existing APIs to the introduction of temporary system and temp functions according to FLIP-57.
This closes #9822.
---
.../java/internal/StreamTableEnvironmentImpl.java | 6 +-
.../table/api/internal/TableEnvironmentImpl.java | 2 +-
.../flink/table/catalog/FunctionCatalog.java | 97 ++++++++++++++++++----
.../internal/StreamTableEnvironmentImpl.scala | 6 +-
.../planner/plan/utils/RexNodeExtractorTest.scala | 2 +-
.../flink/table/planner/utils/TableTestBase.scala | 4 +-
.../flink/table/api/internal/TableEnvImpl.scala | 6 +-
7 files changed, 96 insertions(+), 27 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index cf2ace7..06b20f5 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -147,7 +147,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction);
- functionCatalog.registerTableFunction(
+ functionCatalog.registerTempSystemTableFunction(
name,
tableFunction,
typeInfo
@@ -160,7 +160,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
aggregateFunction,
typeInfo,
@@ -175,7 +175,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
tableAggregateFunction,
typeInfo,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 88aa167..12e874e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -163,7 +163,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
@Override
public void registerFunction(String name, ScalarFunction function) {
- functionCatalog.registerScalarFunction(
+ functionCatalog.registerTempSystemScalarFunction(
name,
function);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 423b5f3..43280b4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -58,9 +58,8 @@ public class FunctionCatalog implements FunctionLookup {
private final CatalogManager catalogManager;
- // For simplicity, currently hold registered Flink functions in memory here
- // TODO: should move to catalog
- private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>();
+ private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
+ private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
/**
* Temporary utility until the new type inference is fully functional. It needs to be set by the planner.
@@ -75,15 +74,15 @@ public class FunctionCatalog implements FunctionLookup {
this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
}
- public void registerScalarFunction(String name, ScalarFunction function) {
+ public void registerTempSystemScalarFunction(String name, ScalarFunction function) {
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
- registerFunction(
+ registerTempSystemFunction(
name,
new ScalarFunctionDefinition(name, function)
);
}
- public <T> void registerTableFunction(
+ public <T> void registerTempSystemTableFunction(
String name,
TableFunction<T> function,
TypeInformation<T> resultType) {
@@ -92,7 +91,7 @@ public class FunctionCatalog implements FunctionLookup {
// check if class could be instantiated
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
- registerFunction(
+ registerTempSystemFunction(
name,
new TableFunctionDefinition(
name,
@@ -101,7 +100,7 @@ public class FunctionCatalog implements FunctionLookup {
);
}
- public <T, ACC> void registerAggregateFunction(
+ public <T, ACC> void registerTempSystemAggregateFunction(
String name,
UserDefinedAggregateFunction<T, ACC> function,
TypeInformation<T> resultType,
@@ -128,12 +127,71 @@ public class FunctionCatalog implements FunctionLookup {
throw new TableException("Unknown function class: " + function.getClass());
}
- registerFunction(
+ registerTempSystemFunction(
name,
definition
);
}
+ public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
+ UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+ registerTempCatalogFunction(
+ oi,
+ new ScalarFunctionDefinition(oi.getObjectName(), function)
+ );
+ }
+
+ public <T> void registerTempCatalogTableFunction(
+ ObjectIdentifier oi,
+ TableFunction<T> function,
+ TypeInformation<T> resultType) {
+ // check if class not Scala object
+ UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+ // check if class could be instantiated
+ UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+
+ registerTempCatalogFunction(
+ oi,
+ new TableFunctionDefinition(
+ oi.getObjectName(),
+ function,
+ resultType)
+ );
+ }
+
+ public <T, ACC> void registerTempCatalogAggregateFunction(
+ ObjectIdentifier oi,
+ UserDefinedAggregateFunction<T, ACC> function,
+ TypeInformation<T> resultType,
+ TypeInformation<ACC> accType) {
+ // check if class not Scala object
+ UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+ // check if class could be instantiated
+ UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+
+ final FunctionDefinition definition;
+ if (function instanceof AggregateFunction) {
+ definition = new AggregateFunctionDefinition(
+ oi.getObjectName(),
+ (AggregateFunction<?, ?>) function,
+ resultType,
+ accType);
+ } else if (function instanceof TableAggregateFunction) {
+ definition = new TableAggregateFunctionDefinition(
+ oi.getObjectName(),
+ (TableAggregateFunction<?, ?>) function,
+ resultType,
+ accType);
+ } else {
+ throw new TableException("Unknown function class: " + function.getClass());
+ }
+
+ registerTempCatalogFunction(
+ oi,
+ definition
+ );
+ }
+
public String[] getUserDefinedFunctions() {
return getUserDefinedFunctionNames().toArray(new String[0]);
}
@@ -165,7 +223,7 @@ public class FunctionCatalog implements FunctionLookup {
// Get functions registered in memory
result.addAll(
- userFunctions.values().stream()
+ tempSystemFunctions.values().stream()
.map(FunctionDefinition::toString)
.collect(Collectors.toSet()));
@@ -204,7 +262,7 @@ public class FunctionCatalog implements FunctionLookup {
}
// If no corresponding function is found in catalog, check in-memory functions
- userCandidate = userFunctions.get(functionName);
+ userCandidate = tempSystemFunctions.get(functionName);
final Optional<FunctionDefinition> foundDefinition;
if (userCandidate != null) {
@@ -240,13 +298,24 @@ public class FunctionCatalog implements FunctionLookup {
return plannerTypeInferenceUtil;
}
- private void registerFunction(String name, FunctionDefinition functionDefinition) {
- // TODO: should register to catalog
- userFunctions.put(normalizeName(name), functionDefinition);
+ private void registerTempSystemFunction(String name, FunctionDefinition functionDefinition) {
+ tempSystemFunctions.put(normalizeName(name), functionDefinition);
+ }
+
+ private void registerTempCatalogFunction(ObjectIdentifier oi, FunctionDefinition functionDefinition) {
+ tempCatalogFunctions.put(normalizeObjectIdentifier(oi), functionDefinition);
}
@VisibleForTesting
static String normalizeName(String name) {
return name.toUpperCase();
}
+
+ @VisibleForTesting
+ static ObjectIdentifier normalizeObjectIdentifier(ObjectIdentifier oi) {
+ return ObjectIdentifier.of(
+ oi.getCatalogName(),
+ oi.getDatabaseName(),
+ oi.getObjectName().toUpperCase());
+ }
}
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c45d324..1384be0 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -137,7 +137,7 @@ class StreamTableEnvironmentImpl (
override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
val typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
- functionCatalog.registerTableFunction(
+ functionCatalog.registerTempSystemTableFunction(
name,
tf,
typeInfo
@@ -152,7 +152,7 @@ class StreamTableEnvironmentImpl (
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
@@ -168,7 +168,7 @@ class StreamTableEnvironmentImpl (
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index a49f06f..f0289e4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -696,7 +696,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
@Test
def testExtractWithUdf(): Unit = {
- functionCatalog.registerScalarFunction("myUdf", Func1)
+ functionCatalog.registerTempSystemScalarFunction("myUdf", Func1)
// amount
val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
// my_udf(amount)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 0201d77..a452717 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -912,7 +912,7 @@ class TestingTableEnvironment private(
def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
val typeInfo = UserFunctionsTypeHelper
.getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
- functionCatalog.registerTableFunction(
+ functionCatalog.registerTempSystemTableFunction(
name,
tf,
typeInfo
@@ -944,7 +944,7 @@ class TestingTableEnvironment private(
.getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
val accTypeInfo = UserFunctionsTypeHelper
.getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
f,
typeInfo,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0e00268..a00ea35 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -99,7 +99,7 @@ abstract class TableEnvImpl(
private def isBatchTable: Boolean = !isStreamingMode
override def registerFunction(name: String, function: ScalarFunction): Unit = {
- functionCatalog.registerScalarFunction(
+ functionCatalog.registerTempSystemScalarFunction(
name,
function)
}
@@ -117,7 +117,7 @@ abstract class TableEnvImpl(
function,
implicitly[TypeInformation[T]])
- functionCatalog.registerTableFunction(
+ functionCatalog.registerTempSystemTableFunction(
name,
function,
resultTypeInfo)
@@ -141,7 +141,7 @@ abstract class TableEnvImpl(
function,
implicitly[TypeInformation[ACC]])
- functionCatalog.registerAggregateFunction(
+ functionCatalog.registerTempSystemAggregateFunction(
name,
function,
resultTypeInfo,