You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/09 08:35:59 UTC
[flink] branch release-1.11 updated: [FLINK-18520][table] Fix
unresolvable catalog table functions
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new bbfb81d [FLINK-18520][table] Fix unresolvable catalog table functions
bbfb81d is described below
commit bbfb81df5193c3551b2c17812321facddcfba586
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 8 15:08:43 2020 +0200
[FLINK-18520][table] Fix unresolvable catalog table functions
This closes #12857.
---
.../catalog/FunctionCatalogOperatorTable.java | 17 ++++++---
.../planner/runtime/stream/sql/FunctionITCase.java | 44 ++++++++++------------
2 files changed, 32 insertions(+), 29 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index 0bf8d9b..56d141c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -153,7 +153,7 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
FunctionIdentifier identifier,
FunctionDefinition definition) {
- if (!verifyFunctionKind(category, definition)) {
+ if (!verifyFunctionKind(category, identifier, definition)) {
return Optional.empty();
}
@@ -196,16 +196,23 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
@SuppressWarnings("RedundantIfStatement")
private boolean verifyFunctionKind(
@Nullable SqlFunctionCategory category,
+ FunctionIdentifier identifier,
FunctionDefinition definition) {
// it would be nice to give a more meaningful exception when a scalar function is used instead
// of a table function and vice versa, but we can do that only once FLIP-51 is implemented
- if (definition.getKind() == FunctionKind.SCALAR &&
- (category == SqlFunctionCategory.USER_DEFINED_FUNCTION || category == SqlFunctionCategory.SYSTEM)) {
+ if (definition.getKind() == FunctionKind.SCALAR) {
+ if (category != null && category.isTableFunction()) {
+ throw new ValidationException(
+ String.format(
+ "Function '%s' cannot be used as a table function.",
+ identifier.asSummaryString()
+ )
+ );
+ }
return true;
- } else if (definition.getKind() == FunctionKind.TABLE &&
- (category == SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION || category == SqlFunctionCategory.SYSTEM)) {
+ } else if (definition.getKind() == FunctionKind.TABLE) {
return true;
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index f2a40ac..c374f1f 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -446,7 +446,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testPrimitiveScalarFunction() throws Exception {
+ public void testPrimitiveScalarFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of(1, 1L, "-"),
Row.of(2, 2L, "--"),
@@ -471,7 +471,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testNullScalarFunction() throws Exception {
+ public void testNullScalarFunction() {
final List<Row> sinkData = Collections.singletonList(
Row.of("Boolean", "String", "<<unknown>>", "String", "Object", "Boolean"));
@@ -496,7 +496,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testRowScalarFunction() throws Exception {
+ public void testRowScalarFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of(1, Row.of(1, "1")),
Row.of(2, Row.of(2, "2")),
@@ -518,7 +518,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testComplexScalarFunction() throws Exception {
+ public void testComplexScalarFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of(1, new byte[]{1, 2, 3}),
Row.of(2, new byte[]{2, 3, 4}),
@@ -588,7 +588,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testCustomScalarFunction() throws Exception {
+ public void testCustomScalarFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of(1),
Row.of(2),
@@ -622,7 +622,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testRawLiteralScalarFunction() throws Exception {
+ public void testRawLiteralScalarFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of(1, DayOfWeek.MONDAY),
Row.of(2, DayOfWeek.FRIDAY),
@@ -748,7 +748,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testRowTableFunction() throws Exception {
+ public void testRowTableFunction() {
final List<Row> sourceData = Arrays.asList(
Row.of("1,2,3"),
Row.of("2,3,4"),
@@ -801,7 +801,7 @@ public class FunctionITCase extends StreamingTestBase {
}
@Test
- public void testDynamicTableFunction() throws Exception {
+ public void testDynamicCatalogTableFunction() {
final Row[] sinkData = new Row[]{
Row.of("Test is a string"),
Row.of("42"),
@@ -812,7 +812,7 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
- tEnv().createTemporarySystemFunction("DynamicTableFunction", DynamicTableFunction.class);
+ tEnv().createFunction("DynamicTableFunction", DynamicTableFunction.class);
execInsertSqlAndWaitResult(
"INSERT INTO SinkTable " +
"SELECT T1.s FROM TABLE(DynamicTableFunction('Test')) AS T1(s) " +
@@ -826,7 +826,7 @@ public class FunctionITCase extends StreamingTestBase {
@Test
public void testInvalidUseOfScalarFunction() {
- tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+ tEnv().executeSql("CREATE TABLE SinkTable(s BIGINT NOT NULL) WITH ('connector' = 'COLLECTION')");
tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
try {
@@ -839,7 +839,7 @@ public class FunctionITCase extends StreamingTestBase {
e,
hasMessage(
containsString(
- "No match found for function signature PrimitiveScalarFunction(<NUMERIC>, <NUMERIC>, <CHARACTER>)")));
+ "SQL validation failed. Function 'PrimitiveScalarFunction' cannot be used as a table function.")));
}
}
@@ -863,21 +863,17 @@ public class FunctionITCase extends StreamingTestBase {
@Test
public void testInvalidUseOfTableFunction() {
- tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+ TestCollectionTableFactory.reset();
+
+ tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
- try {
- tEnv().executeSql(
- "INSERT INTO SinkTable " +
- "SELECT RowTableFunction('test')");
- fail();
- } catch (ValidationException e) {
- assertThat(
- e,
- hasMessage(
- containsString(
- "No match found for function signature RowTableFunction(<CHARACTER>)")));
- }
+ tEnv().executeSql(
+ "INSERT INTO SinkTable " +
+ "SELECT RowTableFunction('test')");
+
+ // currently, calling a table function like a scalar function produces no result
+ assertThat(TestCollectionTableFactory.getResult(), equalTo(Collections.emptyList()));
}
// --------------------------------------------------------------------------------------------