You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/05 05:22:14 UTC

[flink] branch release-1.10 updated: [FLINK-16414][table] Fix sql validation failed when using udaf/udtf has no getResultType

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c73220c  [FLINK-16414][table] Fix sql validation failed when using udaf/udtf has no getResultType
c73220c is described below

commit c73220cb196ccf648047d3dc8b838e1e1882b471
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Wed Mar 4 15:19:38 2020 +0800

    [FLINK-16414][table] Fix sql validation failed when using udaf/udtf has no getResultType
    
    This closes #11310
---
 .../table/functions/FunctionDefinitionUtil.java    | 10 ++--
 .../functions/FunctionDefinitionUtilTest.java      | 67 ++++++++++++++++++++--
 2 files changed, 66 insertions(+), 11 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
index da798b6..374b138 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -45,7 +45,7 @@ public class FunctionDefinitionUtil {
 			return new TableFunctionDefinition(
 				name,
 				t,
-				t.getResultType()
+				UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t)
 			);
 		} else if (udf instanceof AggregateFunction) {
 			AggregateFunction a = (AggregateFunction) udf;
@@ -53,8 +53,8 @@ public class FunctionDefinitionUtil {
 			return new AggregateFunctionDefinition(
 				name,
 				a,
-				a.getAccumulatorType(),
-				a.getResultType()
+				UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(a),
+				UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(a)
 			);
 		} else if (udf instanceof TableAggregateFunction) {
 			TableAggregateFunction a = (TableAggregateFunction) udf;
@@ -62,8 +62,8 @@ public class FunctionDefinitionUtil {
 			return new TableAggregateFunctionDefinition(
 				name,
 				a,
-				a.getAccumulatorType(),
-				a.getResultType()
+				UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(a),
+				UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(a)
 			);
 		} else {
 			throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
index b37ed8b..691734c 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -40,32 +40,56 @@ public class FunctionDefinitionUtilTest {
 
 	@Test
 	public void testTableFunction() {
-		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+		FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
 			"test",
 			TestTableFunction.class.getName()
 		);
 
-		assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction);
+		assertTrue(((TableFunctionDefinition) fd1).getTableFunction() instanceof TestTableFunction);
+
+		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+				"test",
+				TestTableFunctionWithoutResultType.class.getName()
+		);
+
+		assertTrue(((TableFunctionDefinition) fd2).getTableFunction() instanceof TestTableFunctionWithoutResultType);
 	}
 
 	@Test
 	public void testAggregateFunction() {
-		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+		FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
 			"test",
 			TestAggFunction.class.getName()
 		);
 
-		assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction);
+		assertTrue(((AggregateFunctionDefinition) fd1).getAggregateFunction() instanceof TestAggFunction);
+
+		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+				"test",
+				TestAggFunctionWithoutResultType.class.getName()
+		);
+
+		assertTrue(((AggregateFunctionDefinition) fd2).getAggregateFunction()
+				instanceof TestAggFunctionWithoutResultType);
+
 	}
 
 	@Test
 	public void testTableAggregateFunction() {
-		FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+		FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
 			"test",
 			TestTableAggFunction.class.getName()
 		);
 
-		assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction);
+		assertTrue(((TableAggregateFunctionDefinition) fd1).getTableAggregateFunction() instanceof TestTableAggFunction);
+
+		FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+				"test",
+				TestTableAggFunctionWithoutResultType.class.getName()
+		);
+
+		assertTrue(((TableAggregateFunctionDefinition) fd2).getTableAggregateFunction()
+				instanceof TestTableAggFunctionWithoutResultType);
 	}
 
 	/**
@@ -88,6 +112,12 @@ public class FunctionDefinitionUtilTest {
 	/**
 	 * Test function.
 	 */
+	public static class TestTableFunctionWithoutResultType extends TableFunction<String> {
+	}
+
+	/**
+	 * Test function.
+	 */
 	public static class TestAggFunction extends AggregateFunction {
 		@Override
 		public Object createAccumulator() {
@@ -113,6 +143,21 @@ public class FunctionDefinitionUtilTest {
 	/**
 	 * Test function.
 	 */
+	public static class TestAggFunctionWithoutResultType extends AggregateFunction<Long, Long> {
+		@Override
+		public Long createAccumulator() {
+			return null;
+		}
+
+		@Override
+		public Long getValue(Long accumulator) {
+			return null;
+		}
+	}
+
+	/**
+	 * Test function.
+	 */
 	public static class TestTableAggFunction extends TableAggregateFunction {
 		@Override
 		public Object createAccumulator() {
@@ -129,4 +174,14 @@ public class FunctionDefinitionUtilTest {
 			return TypeInformation.of(Object.class);
 		}
 	}
+
+	/**
+	 * Test function.
+	 */
+	public static class TestTableAggFunctionWithoutResultType extends TableAggregateFunction<Long, Long> {
+		@Override
+		public Long createAccumulator() {
+			return null;
+		}
+	}
 }