You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/05 05:22:14 UTC
[flink] branch release-1.10 updated: [FLINK-16414][table] Fix sql
validation failed when using udaf/udtf has no getResultType
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new c73220c [FLINK-16414][table] Fix sql validation failed when using udaf/udtf has no getResultType
c73220c is described below
commit c73220cb196ccf648047d3dc8b838e1e1882b471
Author: zjuwangg <zj...@foxmail.com>
AuthorDate: Wed Mar 4 15:19:38 2020 +0800
[FLINK-16414][table] Fix sql validation failed when using udaf/udtf has no getResultType
This closes #11310
---
.../table/functions/FunctionDefinitionUtil.java | 10 ++--
.../functions/FunctionDefinitionUtilTest.java | 67 ++++++++++++++++++++--
2 files changed, 66 insertions(+), 11 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
index da798b6..374b138 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/functions/FunctionDefinitionUtil.java
@@ -45,7 +45,7 @@ public class FunctionDefinitionUtil {
return new TableFunctionDefinition(
name,
t,
- t.getResultType()
+ UserDefinedFunctionHelper.getReturnTypeOfTableFunction(t)
);
} else if (udf instanceof AggregateFunction) {
AggregateFunction a = (AggregateFunction) udf;
@@ -53,8 +53,8 @@ public class FunctionDefinitionUtil {
return new AggregateFunctionDefinition(
name,
a,
- a.getAccumulatorType(),
- a.getResultType()
+ UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(a),
+ UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(a)
);
} else if (udf instanceof TableAggregateFunction) {
TableAggregateFunction a = (TableAggregateFunction) udf;
@@ -62,8 +62,8 @@ public class FunctionDefinitionUtil {
return new TableAggregateFunctionDefinition(
name,
a,
- a.getAccumulatorType(),
- a.getResultType()
+ UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(a),
+ UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(a)
);
} else {
throw new UnsupportedOperationException(
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
index b37ed8b..691734c 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/functions/FunctionDefinitionUtilTest.java
@@ -40,32 +40,56 @@ public class FunctionDefinitionUtilTest {
@Test
public void testTableFunction() {
- FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
"test",
TestTableFunction.class.getName()
);
- assertTrue(((TableFunctionDefinition) fd).getTableFunction() instanceof TestTableFunction);
+ assertTrue(((TableFunctionDefinition) fd1).getTableFunction() instanceof TestTableFunction);
+
+ FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ TestTableFunctionWithoutResultType.class.getName()
+ );
+
+ assertTrue(((TableFunctionDefinition) fd2).getTableFunction() instanceof TestTableFunctionWithoutResultType);
}
@Test
public void testAggregateFunction() {
- FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
"test",
TestAggFunction.class.getName()
);
- assertTrue(((AggregateFunctionDefinition) fd).getAggregateFunction() instanceof TestAggFunction);
+ assertTrue(((AggregateFunctionDefinition) fd1).getAggregateFunction() instanceof TestAggFunction);
+
+ FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ TestAggFunctionWithoutResultType.class.getName()
+ );
+
+ assertTrue(((AggregateFunctionDefinition) fd2).getAggregateFunction()
+ instanceof TestAggFunctionWithoutResultType);
+
}
@Test
public void testTableAggregateFunction() {
- FunctionDefinition fd = FunctionDefinitionUtil.createFunctionDefinition(
+ FunctionDefinition fd1 = FunctionDefinitionUtil.createFunctionDefinition(
"test",
TestTableAggFunction.class.getName()
);
- assertTrue(((TableAggregateFunctionDefinition) fd).getTableAggregateFunction() instanceof TestTableAggFunction);
+ assertTrue(((TableAggregateFunctionDefinition) fd1).getTableAggregateFunction() instanceof TestTableAggFunction);
+
+ FunctionDefinition fd2 = FunctionDefinitionUtil.createFunctionDefinition(
+ "test",
+ TestTableAggFunctionWithoutResultType.class.getName()
+ );
+
+ assertTrue(((TableAggregateFunctionDefinition) fd2).getTableAggregateFunction()
+ instanceof TestTableAggFunctionWithoutResultType);
}
/**
@@ -88,6 +112,12 @@ public class FunctionDefinitionUtilTest {
/**
* Test function.
*/
+ public static class TestTableFunctionWithoutResultType extends TableFunction<String> {
+ }
+
+ /**
+ * Test function.
+ */
public static class TestAggFunction extends AggregateFunction {
@Override
public Object createAccumulator() {
@@ -113,6 +143,21 @@ public class FunctionDefinitionUtilTest {
/**
* Test function.
*/
+ public static class TestAggFunctionWithoutResultType extends AggregateFunction<Long, Long> {
+ @Override
+ public Long createAccumulator() {
+ return null;
+ }
+
+ @Override
+ public Long getValue(Long accumulator) {
+ return null;
+ }
+ }
+
+ /**
+ * Test function.
+ */
public static class TestTableAggFunction extends TableAggregateFunction {
@Override
public Object createAccumulator() {
@@ -129,4 +174,14 @@ public class FunctionDefinitionUtilTest {
return TypeInformation.of(Object.class);
}
}
+
+ /**
+ * Test function.
+ */
+ public static class TestTableAggFunctionWithoutResultType extends TableAggregateFunction<Long, Long> {
+ @Override
+ public Long createAccumulator() {
+ return null;
+ }
+ }
}