You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/09 08:35:59 UTC

[flink] branch release-1.11 updated: [FLINK-18520][table] Fix unresolvable catalog table functions

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new bbfb81d  [FLINK-18520][table] Fix unresolvable catalog table functions
bbfb81d is described below

commit bbfb81df5193c3551b2c17812321facddcfba586
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jul 8 15:08:43 2020 +0200

    [FLINK-18520][table] Fix unresolvable catalog table functions
    
    This closes #12857.
---
 .../catalog/FunctionCatalogOperatorTable.java      | 17 ++++++---
 .../planner/runtime/stream/sql/FunctionITCase.java | 44 ++++++++++------------
 2 files changed, 32 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index 0bf8d9b..56d141c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -153,7 +153,7 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 			FunctionIdentifier identifier,
 			FunctionDefinition definition) {
 
-		if (!verifyFunctionKind(category, definition)) {
+		if (!verifyFunctionKind(category, identifier, definition)) {
 			return Optional.empty();
 		}
 
@@ -196,16 +196,23 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 	@SuppressWarnings("RedundantIfStatement")
 	private boolean verifyFunctionKind(
 			@Nullable SqlFunctionCategory category,
+			FunctionIdentifier identifier,
 			FunctionDefinition definition) {
 
 		// it would be nice to give a more meaningful exception when a scalar function is used instead
 		// of a table function and vice versa, but we can do that only once FLIP-51 is implemented
 
-		if (definition.getKind() == FunctionKind.SCALAR &&
-				(category == SqlFunctionCategory.USER_DEFINED_FUNCTION || category == SqlFunctionCategory.SYSTEM)) {
+		if (definition.getKind() == FunctionKind.SCALAR) {
+			if (category != null && category.isTableFunction()) {
+				throw new ValidationException(
+					String.format(
+						"Function '%s' cannot be used as a table function.",
+						identifier.asSummaryString()
+					)
+				);
+			}
 			return true;
-		} else if (definition.getKind() == FunctionKind.TABLE &&
-				(category == SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION || category == SqlFunctionCategory.SYSTEM)) {
+		} else if (definition.getKind() == FunctionKind.TABLE) {
 			return true;
 		}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index f2a40ac..c374f1f 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -446,7 +446,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testPrimitiveScalarFunction() throws Exception {
+	public void testPrimitiveScalarFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of(1, 1L, "-"),
 			Row.of(2, 2L, "--"),
@@ -471,7 +471,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testNullScalarFunction() throws Exception {
+	public void testNullScalarFunction() {
 		final List<Row> sinkData = Collections.singletonList(
 			Row.of("Boolean", "String", "<<unknown>>", "String", "Object", "Boolean"));
 
@@ -496,7 +496,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testRowScalarFunction() throws Exception {
+	public void testRowScalarFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of(1, Row.of(1, "1")),
 			Row.of(2, Row.of(2, "2")),
@@ -518,7 +518,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testComplexScalarFunction() throws Exception {
+	public void testComplexScalarFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of(1, new byte[]{1, 2, 3}),
 			Row.of(2, new byte[]{2, 3, 4}),
@@ -588,7 +588,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testCustomScalarFunction() throws Exception {
+	public void testCustomScalarFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of(1),
 			Row.of(2),
@@ -622,7 +622,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testRawLiteralScalarFunction() throws Exception {
+	public void testRawLiteralScalarFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of(1, DayOfWeek.MONDAY),
 			Row.of(2, DayOfWeek.FRIDAY),
@@ -748,7 +748,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testRowTableFunction() throws Exception {
+	public void testRowTableFunction() {
 		final List<Row> sourceData = Arrays.asList(
 			Row.of("1,2,3"),
 			Row.of("2,3,4"),
@@ -801,7 +801,7 @@ public class FunctionITCase extends StreamingTestBase {
 	}
 
 	@Test
-	public void testDynamicTableFunction() throws Exception {
+	public void testDynamicCatalogTableFunction() {
 		final Row[] sinkData = new Row[]{
 			Row.of("Test is a string"),
 			Row.of("42"),
@@ -812,7 +812,7 @@ public class FunctionITCase extends StreamingTestBase {
 
 		tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
 
-		tEnv().createTemporarySystemFunction("DynamicTableFunction", DynamicTableFunction.class);
+		tEnv().createFunction("DynamicTableFunction", DynamicTableFunction.class);
 		execInsertSqlAndWaitResult(
 			"INSERT INTO SinkTable " +
 			"SELECT T1.s FROM TABLE(DynamicTableFunction('Test')) AS T1(s) " +
@@ -826,7 +826,7 @@ public class FunctionITCase extends StreamingTestBase {
 
 	@Test
 	public void testInvalidUseOfScalarFunction() {
-		tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+		tEnv().executeSql("CREATE TABLE SinkTable(s BIGINT NOT NULL) WITH ('connector' = 'COLLECTION')");
 
 		tEnv().createTemporarySystemFunction("PrimitiveScalarFunction", PrimitiveScalarFunction.class);
 		try {
@@ -839,7 +839,7 @@ public class FunctionITCase extends StreamingTestBase {
 				e,
 				hasMessage(
 					containsString(
-						"No match found for function signature PrimitiveScalarFunction(<NUMERIC>, <NUMERIC>, <CHARACTER>)")));
+						"SQL validation failed. Function 'PrimitiveScalarFunction' cannot be used as a table function.")));
 		}
 	}
 
@@ -863,21 +863,17 @@ public class FunctionITCase extends StreamingTestBase {
 
 	@Test
 	public void testInvalidUseOfTableFunction() {
-		tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
+		TestCollectionTableFactory.reset();
+
+		tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
 
 		tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
-		try {
-			tEnv().executeSql(
-				"INSERT INTO SinkTable " +
-				"SELECT RowTableFunction('test')");
-			fail();
-		} catch (ValidationException e) {
-			assertThat(
-				e,
-				hasMessage(
-					containsString(
-						"No match found for function signature RowTableFunction(<CHARACTER>)")));
-		}
+		tEnv().executeSql(
+			"INSERT INTO SinkTable " +
+			"SELECT RowTableFunction('test')");
+
+		// currently, calling a table function like a scalar function produces no result
+		assertThat(TestCollectionTableFactory.getResult(), equalTo(Collections.emptyList()));
 	}
 
 	// --------------------------------------------------------------------------------------------