You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/05/11 15:38:34 UTC
[flink] branch master updated: [FLINK-17523] Add call expression
with a class of UDF as a parameter
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3561adf [FLINK-17523] Add call expression with a class of UDF as a parameter
3561adf is described below
commit 3561adf03deb88bff540773b5a2037c8576c09f8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon May 11 08:46:03 2020 +0200
[FLINK-17523] Add call expression with a class of UDF as a parameter
---
.../org/apache/flink/table/api/Expressions.java | 12 +++++++++++
.../java/org/apache/flink/table/api/Table.java | 24 ++++++++--------------
.../apache/flink/table/api/WindowGroupedTable.java | 6 ++----
.../resolver/ExpressionResolverTest.java | 14 +++++++++++++
.../org/apache/flink/table/api/expressionDsl.scala | 9 ++++++++
5 files changed, 45 insertions(+), 20 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 6071764..37f53d4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.types.utils.ValueDataTypeConverter;
@@ -526,6 +527,17 @@ public final class Expressions {
return apiCall(function, arguments);
}
+ /**
+ * A call to an unregistered, inline function.
+ *
+ * <p>For functions that have been registered before and are identified by a name, use
+ * {@link #call(String, Object...)}.
+ */
+ public static ApiExpression call(Class<? extends UserDefinedFunction> function, Object... arguments) {
+ final UserDefinedFunction functionInstance = UserDefinedFunctionHelper.instantiateFunction(function);
+ return apiCall(functionInstance, arguments);
+ }
+
private static ApiExpression apiCall(FunctionDefinition functionDefinition, Object... args) {
List<Expression> arguments =
Stream.of(args)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index d1239e6..4d8219f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -595,8 +595,7 @@ public interface Table {
* }
* }
*
- * TableFunction<String> split = new MySplitUDTF();
- * table.joinLateral(call(split, $("c")).as("s"))
+ * table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"))
* .select($("a"), $("b"), $("c"), $("s"));
* }
* </pre>
@@ -659,8 +658,7 @@ public interface Table {
* }
* }
*
- * TableFunction<String> split = new MySplitUDTF();
- * table.joinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s")))
+ * table.joinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s")))
* .select($("a"), $("b"), $("c"), $("s"));
* }
* </pre>
@@ -725,8 +723,7 @@ public interface Table {
* }
* }
*
- * TableFunction<String> split = new MySplitUDTF();
- * table.leftOuterJoinLateral(call(split, $("c")).as("s"))
+ * table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"))
* .select($("a"), $("b"), $("c"), $("s"));
* }
* </pre>
@@ -791,8 +788,7 @@ public interface Table {
* }
* }
*
- * TableFunction<String> split = new MySplitUDTF();
- * table.leftOuterJoinLateral(call(split, $("c")).as("s"), $("a").isEqual($("s")))
+ * table.leftOuterJoinLateral(call(MySplitUDTF.class, $("c")).as("s"), $("a").isEqual($("s")))
* .select($("a"), $("b"), $("c"), $("s"));
* }
* </pre>
@@ -1267,8 +1263,7 @@ public interface Table {
*
* <pre>
* {@code
- * ScalarFunction func = new MyMapFunction();
- * tab.map(call(func, $("c")))
+ * tab.map(call(MyMapFunction.class, $("c")))
* }
* </pre>
*
@@ -1309,8 +1304,7 @@ public interface Table {
*
* <pre>
* {@code
- * TableFunction func = new MyFlatMapFunction();
- * tab.flatMap(call(func, $("c")))
+ * tab.flatMap(call(MyFlatMapFunction.class, $("c")))
* }
* </pre>
*
@@ -1354,8 +1348,7 @@ public interface Table {
*
* <pre>
* {@code
- * AggregateFunction aggFunc = new MyAggregateFunction();
- * tab.aggregate(call(aggFunc, $("a"), $("b")).as("f0", "f1", "f2"))
+ * tab.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("f0", "f1", "f2"))
* .select($("f0"), $("f1"));
* }
* </pre>
@@ -1399,8 +1392,7 @@ public interface Table {
*
* <pre>
* {@code
- * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
- * tab.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z"))
+ * tab.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
* .select($("x"), $("y"), $("z"));
* }
* </pre>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
index 409c7b6..0dbfc01 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/WindowGroupedTable.java
@@ -95,8 +95,7 @@ public interface WindowGroupedTable {
*
* <pre>
* {@code
- * AggregateFunction aggFunc = new MyAggregateFunction();
- * windowGroupedTable.aggregate(call(aggFunc, $("a"), $("b")).as("x", "y", "z"))
+ * windowGroupedTable.aggregate(call(MyAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
* .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
* }
* </pre>
@@ -142,8 +141,7 @@ public interface WindowGroupedTable {
*
* <pre>
* {@code
- * TableAggregateFunction tableAggFunc = new MyTableAggregateFunction();
- * windowGroupedTable.flatAggregate(call(tableAggFunc, $("a"), $("b")).as("x", "y", "z"))
+ * windowGroupedTable.flatAggregate(call(MyTableAggregateFunction.class, $("a"), $("b")).as("x", "y", "z"))
* .select($("key"), $("window").start(), $("x"), $("y"), $("z"));
* }
* </pre>
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
index f303e44..af5735f 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
@@ -175,6 +175,20 @@ public class ExpressionResolverTest {
DataTypes.INT().notNull().bridgedTo(int.class)
)),
+ TestSpec.test("Inline function call via a class")
+ .inputSchemas(
+ TableSchema.builder()
+ .field("f0", DataTypes.INT())
+ .build()
+ )
+ .select(call(ScalarFunc.class, 1, $("f0")))
+ .equalTo(
+ new CallExpression(
+ new ScalarFunc(),
+ Arrays.asList(valueLiteral(1), new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)),
+ DataTypes.INT().notNull().bridgedTo(int.class)
+ )),
+
TestSpec.test("Lookup catalog function call")
.inputSchemas(
TableSchema.builder()
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index 2a7577a..a1239a9 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -506,6 +506,15 @@ trait ImplicitExpressionConversions {
function,
params: _*)
+ /**
+ * A call to an unregistered, inline function. For functions that have been registered before and
+ * are identified by a name, use [[call(String, Object...)]].
+ */
+ def call(function: Class[_ <: UserDefinedFunction], params: Expression*): Expression =
+ Expressions.call(
+ function,
+ params: _*)
+
// ----------------------------------------------------------------------------------------------
// Implicit expressions in prefix notation
// ----------------------------------------------------------------------------------------------