You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/11 15:38:34 UTC

[flink] branch master updated: [FLINK-17523] Add call expression with a class of UDF as a parameter

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3561adf  [FLINK-17523] Add call expression with a class of UDF as a parameter
3561adf is described below

commit 3561adf03deb88bff540773b5a2037c8576c09f8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon May 11 08:46:03 2020 +0200

    [FLINK-17523] Add call expression with a class of UDF as a parameter
---
 .../org/apache/flink/table/api/Expressions.java    | 12 +++++++++++
 .../java/org/apache/flink/table/api/Table.java     | 24 ++++++++--------------
 .../apache/flink/table/api/WindowGroupedTable.java |  6 ++----
 .../resolver/ExpressionResolverTest.java           | 14 +++++++++++++
 .../org/apache/flink/table/api/expressionDsl.scala |  9 ++++++++
 5 files changed, 45 insertions(+), 20 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 6071764..37f53d4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.types.utils.ValueDataTypeConverter;
@@ -526,6 +527,17 @@ public final class Expressions {
 		return apiCall(function, arguments);
 	}
 
+	/**
+	 * A call to an unregistered, inline function.
+	 *
+	 * <p>For functions that have been registered before and are identified by a name, use
+	 * {@link #call(String, Object...)}.
+	 */
+	public static ApiExpression call(Class<? extends UserDefinedFunction> function, Object... arguments) {
+		final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(function);
+		return apiCall(functionInstance, arguments);
+	}
+
 	private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
 		List<Expression> arguments =
 			Stream.of(args)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index d1239e6..4d8219f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -595,8 +595,7 @@ public interface Table {
 	 *     }
 	 *   }
 	 *
-	 *   TableFunction<String> split = new MySplitUDTF();
-	 *   table.joinLateral(call(split, $("c")).as("s"))
+	 *   table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"))
 	 *        .select($("a"), $("b"), $("c"), $("s"));
 	 * }
 	 * </pre>
@@ -659,8 +658,7 @@ public interface Table {
 	 *     }
 	 *   }
 	 *
-	 *   TableFunction<String> split = new MySplitUDTF();
-	 *   table.joinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s")))
+	 *   table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s")))
 	 *        .select($("a"), $("b"), $("c"), $("s"));
 	 * }
 	 * </pre>
@@ -725,8 +723,7 @@ public interface Table {
 	 *     }
 	 *   }
 	 *
-	 *   TableFunction<String> split = new MySplitUDTF();
-	 *   table.leftOuterJoinLateral(call(split, $("c")).as("s"))
+	 *   table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"))
 	 *        .select($("a"), $("b"), $("c"), $("s"));
 	 * }
 	 * </pre>
@@ -791,8 +788,7 @@ public interface Table {
 	 *     }
 	 *   }
 	 *
-	 *   TableFunction<String> split = new MySplitUDTF();
-	 *   table.leftOuterJoinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s")))
+	 *   table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s")))
 	 *        .select($("a"), $("b"), $("c"), $("s"));
 	 * }
 	 * </pre>
@@ -1267,8 +1263,7 @@ public interface Table {
 	 *
 	 * <pre>
 	 * {@code
-	 *   ScalarFunction func = new MyMapFunction();
-	 *   tab.map(call(func, $("c")))
+	 *   tab.map(call(MyMapFunction.class, $("c")))
 	 * }
 	 * </pre>
 	 *
@@ -1309,8 +1304,7 @@ public interface Table {
 	 *
 	 * <pre>
 	 * {@code
-	 *   TableFunction func = new MyFlatMapFunction();
-	 *   tab.flatMap(call(func, $("c")))
+	 *   tab.flatMap(call(MyFlatMapFunction.class, $("c")))
 	 * }
 	 * </pre>
 	 *
@@ -1354,8 +1348,7 @@ public interface Table {
 	 *
 	 * <pre>
 	 * {@code
-	 *   AggregateFunction aggFunc = new MyAggregateFunction();
-	 *   tab.aggregate(call(aggFunc, $("a"), $("b")).as("f0", "f1", "f2"))
+	 *   tab.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("f0", "f1", "f2"))
 	 *     .select($("f0"), $("f1"));
 	 * }
 	 * </pre>
@@ -1399,8 +1392,7 @@ public interface Table {
 	 *
 	 * <pre>
 	 * {@code
-	 *   TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
-	 *   tab.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z"))
+	 *   tab.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
 	 *     .select($("x"), $("y"), $("z"));
 	 * }
 	 * </pre>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
index 409c7b6..0dbfc01 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
@@ -95,8 +95,7 @@ public interface WindowGroupedTable {
 	 *
 	 * <pre>
 	 * {@code
-	 *   AggregateFunction aggFunc = new MyAggregateFunction();
-	 *   windowGroupedTable.aggregate(call(aggFunc, $("a"), $("b")).as("x", "y", "z"))
+	 *   windowGroupedTable.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
 	 *     .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
 	 * }
 	 * </pre>
@@ -142,8 +141,7 @@ public interface WindowGroupedTable {
 	 *
 	 * <pre>
 	 * {@code
-	 *   TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
-	 *   windowGroupedTable.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z"))
+	 *   windowGroupedTable.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
 	 *     .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
 	 * }
 	 * </pre>
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index f303e44..af5735f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -175,6 +175,20 @@ public class ExpressionResolverTest {
 						DataTypes.INT().notNull().bridgedTo(int.class)
 					)),
 
+			TestSpec.test("Inline function call via a class")
+				.inputSchemas(
+					TableSchema.builder()
+						.field("f0", DataTypes.INT())
+						.build()
+				)
+				.select(call(ScalarFunc.class, 1, $("f0")))
+				.equalTo(
+					new CallExpression(
+						new ScalarFunc(),
+						Arrays.asList(valueLiteral(1), new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)),
+						DataTypes.INT().notNull().bridgedTo(int.class)
+					)),
+
 			TestSpec.test("Lookup catalog function call")
 				.inputSchemas(
 					TableSchema.builder()
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index 2a7577a..a1239a9 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -506,6 +506,15 @@ trait ImplicitExpressionConversions {
     function,
     params: _*)
 
+  /**
+   * A call to an unregistered, inline function. For functions that have been registered before and
+   * are identified by a name, use [[call(String, Object...)]].
+   */
+  def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression =
+    Expressions.call(
+      function,
+      params: _*)
+
   // ----------------------------------------------------------------------------------------------
   // Implicit expressions in prefix notation
   // ----------------------------------------------------------------------------------------------