You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/10/15 21:04:47 UTC

[flink] branch master updated: [FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b24e4bc  [FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog
b24e4bc is described below

commit b24e4bcb4cca7e4a9d14ae3b3fff2d2678517863
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Sep 30 14:44:49 2019 -0700

    [FLINK-14216][table] introduce temp system functions and temp functions to FunctionCatalog
    
    adapt existing APIs to the introduction of temporary system and temp functions according to FLIP-57.
    
    This closes #9822.
---
 .../java/internal/StreamTableEnvironmentImpl.java  |  6 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  2 +-
 .../flink/table/catalog/FunctionCatalog.java       | 97 ++++++++++++++++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  6 +-
 .../planner/plan/utils/RexNodeExtractorTest.scala  |  2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 .../flink/table/api/internal/TableEnvImpl.scala    |  6 +-
 7 files changed, 96 insertions(+), 27 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index cf2ace7..06b20f5 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -147,7 +147,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 	public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
 		TypeInformation<T> typeInfo = UserFunctionsTypeHelper.getReturnTypeOfTableFunction(tableFunction);
 
-		functionCatalog.registerTableFunction(
+		functionCatalog.registerTempSystemTableFunction(
 			name,
 			tableFunction,
 			typeInfo
@@ -160,7 +160,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
 			.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
 
-		functionCatalog.registerAggregateFunction(
+		functionCatalog.registerTempSystemAggregateFunction(
 			name,
 			aggregateFunction,
 			typeInfo,
@@ -175,7 +175,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		TypeInformation<ACC> accTypeInfo = UserFunctionsTypeHelper
 			.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
 
-		functionCatalog.registerAggregateFunction(
+		functionCatalog.registerTempSystemAggregateFunction(
 			name,
 			tableAggregateFunction,
 			typeInfo,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 88aa167..12e874e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -163,7 +163,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 	@Override
 	public void registerFunction(String name, ScalarFunction function) {
-		functionCatalog.registerScalarFunction(
+		functionCatalog.registerTempSystemScalarFunction(
 			name,
 			function);
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 423b5f3..43280b4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -58,9 +58,8 @@ public class FunctionCatalog implements FunctionLookup {
 
 	private final CatalogManager catalogManager;
 
-	// For simplicity, currently hold registered Flink functions in memory here
-	// TODO: should move to catalog
-	private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>();
+	private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
+	private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
 
 	/**
 	 * Temporary utility until the new type inference is fully functional. It needs to be set by the planner.
@@ -75,15 +74,15 @@ public class FunctionCatalog implements FunctionLookup {
 		this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
 	}
 
-	public void registerScalarFunction(String name, ScalarFunction function) {
+	public void registerTempSystemScalarFunction(String name, ScalarFunction function) {
 		UserFunctionsTypeHelper.validateInstantiation(function.getClass());
-		registerFunction(
+		registerTempSystemFunction(
 			name,
 			new ScalarFunctionDefinition(name, function)
 		);
 	}
 
-	public <T> void registerTableFunction(
+	public <T> void registerTempSystemTableFunction(
 			String name,
 			TableFunction<T> function,
 			TypeInformation<T> resultType) {
@@ -92,7 +91,7 @@ public class FunctionCatalog implements FunctionLookup {
 		// check if class could be instantiated
 		UserFunctionsTypeHelper.validateInstantiation(function.getClass());
 
-		registerFunction(
+		registerTempSystemFunction(
 			name,
 			new TableFunctionDefinition(
 				name,
@@ -101,7 +100,7 @@ public class FunctionCatalog implements FunctionLookup {
 		);
 	}
 
-	public <T, ACC> void registerAggregateFunction(
+	public <T, ACC> void registerTempSystemAggregateFunction(
 			String name,
 			UserDefinedAggregateFunction<T, ACC> function,
 			TypeInformation<T> resultType,
@@ -128,12 +127,71 @@ public class FunctionCatalog implements FunctionLookup {
 			throw new TableException("Unknown function class: " + function.getClass());
 		}
 
-		registerFunction(
+		registerTempSystemFunction(
 			name,
 			definition
 		);
 	}
 
+	public void registerTempCatalogScalarFunction(ObjectIdentifier oi, ScalarFunction function) {
+		UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+		registerTempCatalogFunction(
+			oi,
+			new ScalarFunctionDefinition(oi.getObjectName(), function)
+		);
+	}
+
+	public <T> void registerTempCatalogTableFunction(
+			ObjectIdentifier oi,
+			TableFunction<T> function,
+			TypeInformation<T> resultType) {
+		// check if class not Scala object
+		UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+		// check if class could be instantiated
+		UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+
+		registerTempCatalogFunction(
+			oi,
+			new TableFunctionDefinition(
+				oi.getObjectName(),
+				function,
+				resultType)
+		);
+	}
+
+	public <T, ACC> void registerTempCatalogAggregateFunction(
+			ObjectIdentifier oi,
+			UserDefinedAggregateFunction<T, ACC> function,
+			TypeInformation<T> resultType,
+			TypeInformation<ACC> accType) {
+		// check if class not Scala object
+		UserFunctionsTypeHelper.validateNotSingleton(function.getClass());
+		// check if class could be instantiated
+		UserFunctionsTypeHelper.validateInstantiation(function.getClass());
+
+		final FunctionDefinition definition;
+		if (function instanceof AggregateFunction) {
+			definition = new AggregateFunctionDefinition(
+				oi.getObjectName(),
+				(AggregateFunction<?, ?>) function,
+				resultType,
+				accType);
+		} else if (function instanceof TableAggregateFunction) {
+			definition = new TableAggregateFunctionDefinition(
+				oi.getObjectName(),
+				(TableAggregateFunction<?, ?>) function,
+				resultType,
+				accType);
+		} else {
+			throw new TableException("Unknown function class: " + function.getClass());
+		}
+
+		registerTempCatalogFunction(
+			oi,
+			definition
+		);
+	}
+
 	public String[] getUserDefinedFunctions() {
 		return getUserDefinedFunctionNames().toArray(new String[0]);
 	}
@@ -165,7 +223,7 @@ public class FunctionCatalog implements FunctionLookup {
 
 		// Get functions registered in memory
 		result.addAll(
-			userFunctions.values().stream()
+			tempSystemFunctions.values().stream()
 				.map(FunctionDefinition::toString)
 				.collect(Collectors.toSet()));
 
@@ -204,7 +262,7 @@ public class FunctionCatalog implements FunctionLookup {
 		}
 
 		// If no corresponding function is found in catalog, check in-memory functions
-		userCandidate = userFunctions.get(functionName);
+		userCandidate = tempSystemFunctions.get(functionName);
 
 		final Optional<FunctionDefinition> foundDefinition;
 		if (userCandidate != null) {
@@ -240,13 +298,24 @@ public class FunctionCatalog implements FunctionLookup {
 		return plannerTypeInferenceUtil;
 	}
 
-	private void registerFunction(String name, FunctionDefinition functionDefinition) {
-		// TODO: should register to catalog
-		userFunctions.put(normalizeName(name), functionDefinition);
+	private void registerTempSystemFunction(String name, FunctionDefinition functionDefinition) {
+		tempSystemFunctions.put(normalizeName(name), functionDefinition);
+	}
+
+	private void registerTempCatalogFunction(ObjectIdentifier oi, FunctionDefinition functionDefinition) {
+		tempCatalogFunctions.put(normalizeObjectIdentifier(oi), functionDefinition);
 	}
 
 	@VisibleForTesting
 	static String normalizeName(String name) {
 		return name.toUpperCase();
 	}
+
+	@VisibleForTesting
+	static ObjectIdentifier normalizeObjectIdentifier(ObjectIdentifier oi) {
+		return ObjectIdentifier.of(
+			oi.getCatalogName(),
+			oi.getDatabaseName(),
+			oi.getObjectName().toUpperCase());
+	}
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index c45d324..1384be0 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -137,7 +137,7 @@ class StreamTableEnvironmentImpl (
   override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
     val typeInfo = UserFunctionsTypeHelper
       .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
-    functionCatalog.registerTableFunction(
+    functionCatalog.registerTempSystemTableFunction(
       name,
       tf,
       typeInfo
@@ -152,7 +152,7 @@ class StreamTableEnvironmentImpl (
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
     val accTypeInfo = UserFunctionsTypeHelper
       .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
-    functionCatalog.registerAggregateFunction(
+    functionCatalog.registerTempSystemAggregateFunction(
       name,
       f,
       typeInfo,
@@ -168,7 +168,7 @@ class StreamTableEnvironmentImpl (
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
     val accTypeInfo = UserFunctionsTypeHelper
       .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
-    functionCatalog.registerAggregateFunction(
+    functionCatalog.registerTempSystemAggregateFunction(
       name,
       f,
       typeInfo,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index a49f06f..f0289e4 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -696,7 +696,7 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 
   @Test
   def testExtractWithUdf(): Unit = {
-    functionCatalog.registerScalarFunction("myUdf", Func1)
+    functionCatalog.registerTempSystemScalarFunction("myUdf", Func1)
     // amount
     val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
     // my_udf(amount)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 0201d77..a452717 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -912,7 +912,7 @@ class TestingTableEnvironment private(
   def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
     val typeInfo = UserFunctionsTypeHelper
       .getReturnTypeOfTableFunction(tf, implicitly[TypeInformation[T]])
-    functionCatalog.registerTableFunction(
+    functionCatalog.registerTempSystemTableFunction(
       name,
       tf,
       typeInfo
@@ -944,7 +944,7 @@ class TestingTableEnvironment private(
       .getReturnTypeOfAggregateFunction(f, implicitly[TypeInformation[T]])
     val accTypeInfo = UserFunctionsTypeHelper
       .getAccumulatorTypeOfAggregateFunction(f, implicitly[TypeInformation[ACC]])
-    functionCatalog.registerAggregateFunction(
+    functionCatalog.registerTempSystemAggregateFunction(
       name,
       f,
       typeInfo,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0e00268..a00ea35 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -99,7 +99,7 @@ abstract class TableEnvImpl(
   private def isBatchTable: Boolean = !isStreamingMode
 
   override def registerFunction(name: String, function: ScalarFunction): Unit = {
-    functionCatalog.registerScalarFunction(
+    functionCatalog.registerTempSystemScalarFunction(
       name,
       function)
   }
@@ -117,7 +117,7 @@ abstract class TableEnvImpl(
         function,
         implicitly[TypeInformation[T]])
 
-    functionCatalog.registerTableFunction(
+    functionCatalog.registerTempSystemTableFunction(
       name,
       function,
       resultTypeInfo)
@@ -141,7 +141,7 @@ abstract class TableEnvImpl(
       function,
       implicitly[TypeInformation[ACC]])
 
-    functionCatalog.registerAggregateFunction(
+    functionCatalog.registerTempSystemAggregateFunction(
       name,
       function,
       resultTypeInfo,