You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/05 02:32:45 UTC

[flink] branch release-1.11 updated: [FLINK-17717][sql-parser] Throws for DDL create temporary system func with composite identifier

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new edfe844  [FLINK-17717][sql-parser] Throws for DDL create temporary system func with composite identifier
edfe844 is described below

commit edfe844db645b7dc1451503662ee86123473fb1e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jun 5 10:26:47 2020 +0800

    [FLINK-17717][sql-parser] Throws for DDL create temporary system func with composite identifier
    
    
    This closes #12352
---
 .../table/client/cli/SqlCommandParserTest.java     |   4 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 100 +++++++++++----------
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  11 ++-
 .../table/api/internal/TableEnvironmentImpl.java   |  29 ++----
 .../planner/runtime/stream/sql/FunctionITCase.java |  17 ++--
 .../flink/table/api/TableEnvironmentTest.scala     |   4 +-
 .../table/runtime/stream/sql/FunctionITCase.java   |  19 ++--
 .../api/batch/BatchTableEnvironmentTest.scala      |   4 +-
 8 files changed, 93 insertions(+), 95 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 9b7c2d0..e9e3bf6 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -206,9 +206,9 @@ public class SqlCommandParserTest {
 				TestItem.validSql("CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
 						SqlCommand.CREATE_FUNCTION,
 						"CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
-				TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
+				TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA",
 						SqlCommand.CREATE_FUNCTION,
-						"CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
+						"CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA"),
 				// drop function xx
 				TestItem.invalidSql("DROP FUNCTION "),
 				TestItem.invalidSql("DROP FUNCTIONS "),
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ca07daa..249ba13 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -16,6 +16,36 @@
 -->
 
 /**
+ * Parses a "IF EXISTS" option, default is false.
+ */
+boolean IfExistsOpt() :
+{
+}
+{
+    (
+        LOOKAHEAD(2)
+        <IF> <EXISTS> { return true; }
+    |
+        { return false; }
+    )
+}
+
+/**
+ * Parses a "IF NOT EXISTS" option, default is false.
+ */
+boolean IfNotExistsOpt() :
+{
+}
+{
+    (
+        LOOKAHEAD(3)
+        <IF> <NOT> <EXISTS> { return true; }
+    |
+        { return false; }
+    )
+}
+
+/**
 * Parse a "Show Catalogs" metadata query command.
 */
 SqlShowCatalogs SqlShowCatalogs() :
@@ -87,11 +117,7 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
 {
     <CATALOG>
 
-    (
-        <IF> <EXISTS> { ifExists = true; }
-    |
-        { ifExists = false; }
-    )
+    ifExists = IfExistsOpt()
 
     catalogName = CompoundIdentifier()
 
@@ -142,10 +168,9 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
 }
 {
     <DATABASE> { startPos = getPos(); }
-    [
-        LOOKAHEAD(3)
-        <IF> <NOT> <EXISTS> { ifNotExists = true; }
-    ]
+
+    ifNotExists = IfNotExistsOpt()
+
     databaseName = CompoundIdentifier()
     [ <COMMENT> <QUOTED_STRING>
         {
@@ -193,17 +218,13 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
 {
     <DATABASE>
 
-    (
-        <IF> <EXISTS> { ifExists = true; }
-    |
-        { ifExists = false; }
-    )
+    ifExists = IfExistsOpt()
 
     databaseName = CompoundIdentifier()
     [
-                <RESTRICT> { cascade = false; }
-        |
-                <CASCADE>  { cascade = true; }
+        <RESTRICT> { cascade = false; }
+    |
+        <CASCADE>  { cascade = true; }
     ]
 
     {
@@ -236,18 +257,16 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
     boolean isSystemFunction = false;
 }
 {
-    [
-        <SYSTEM> {isSystemFunction = true;}
-    ]
-
-    <FUNCTION>
-
-    [
-        LOOKAHEAD(3)
-        <IF> <NOT> <EXISTS> { ifNotExists = true; }
-    ]
-
-    functionIdentifier = CompoundIdentifier()
+    (
+        <SYSTEM> <FUNCTION>
+        ifNotExists = IfNotExistsOpt()
+        functionIdentifier = SimpleIdentifier()
+        {  isSystemFunction = true; }
+    |
+        <FUNCTION>
+        ifNotExists = IfNotExistsOpt()
+        functionIdentifier = CompoundIdentifier()
+    )
 
     <AS> <QUOTED_STRING> {
         String p = SqlParserUtil.parseString(token.image);
@@ -281,7 +300,7 @@ SqlDrop SqlDropFunction(Span s, boolean replace, boolean isTemporary) :
 
     <FUNCTION>
 
-    [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
+    ifExists = IfExistsOpt()
 
     functionIdentifier = CompoundIdentifier()
 
@@ -309,7 +328,7 @@ SqlAlterFunction SqlAlterFunction() :
 
     <FUNCTION> { startPos = getPos(); }
 
-    [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
+    ifExists = IfExistsOpt()
 
     functionIdentifier = CompoundIdentifier()
 
@@ -813,11 +832,7 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
 {
     <TABLE>
 
-    (
-        <IF> <EXISTS> { ifExists = true; }
-    |
-        { ifExists = false; }
-    )
+    ifExists = IfExistsOpt()
 
     tableName = CompoundIdentifier()
 
@@ -936,10 +951,8 @@ SqlCreate SqlCreateView(Span s, boolean replace, boolean isTemporary) : {
 {
     <VIEW>
 
-    [
-        LOOKAHEAD(3)
-        <IF> <NOT> <EXISTS> { ifNotExists = true; }
-    ]
+    ifNotExists = IfNotExistsOpt()
+
     viewName = CompoundIdentifier()
     [
         fieldList = ParenthesizedSimpleIdentifierList()
@@ -964,11 +977,8 @@ SqlDrop SqlDropView(Span s, boolean replace, boolean isTemporary) :
 {
     <VIEW>
 
-    (
-        <IF> <EXISTS> { ifExists = true; }
-    |
-        { ifExists = false; }
-    )
+    ifExists = IfExistsOpt()
+
     viewName = CompoundIdentifier()
     {
         return new SqlDropView(s.pos(), viewName, ifExists, isTemporary);
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index b319f47..7b01d43 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -947,9 +947,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 		sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
 				.ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
-		sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
-				.ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
-
 		sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'")
 				.ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
 
@@ -964,6 +961,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 
 		sql("create temporary system function  function1 as 'org.apache.fink.function.function1' language scala")
 				.ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+
+		// Temporary system function always belongs to the system and current session.
+		sql("create temporary system function catalog1^.^db1.function1 as 'org.apache.fink.function.function1'")
+				.fails("(?s).*Encountered \".\" at.*");
+
+		// TODO: FLINK-17957: Forbidden syntax "CREATE SYSTEM FUNCTION" for sql parser
+		sql("create system function function1 as 'org.apache.fink.function.function1'")
+				.ok("CREATE SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
 	}
 
 	@Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 82f83f1..3d48655 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1306,18 +1306,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 		String exMsg = getDDLOpExecuteErrorMsg(createCatalogFunctionOperation.asSummaryString());
 		try {
 			if (createCatalogFunctionOperation.isTemporary()) {
-				boolean exist = functionCatalog.hasTemporaryCatalogFunction(
-					createCatalogFunctionOperation.getFunctionIdentifier());
-				if (!exist) {
-					functionCatalog.registerTemporaryCatalogFunction(
+				functionCatalog.registerTemporaryCatalogFunction(
 						UnresolvedIdentifier.of(createCatalogFunctionOperation.getFunctionIdentifier().toList()),
 						createCatalogFunctionOperation.getCatalogFunction(),
-						false);
-				} else if (!createCatalogFunctionOperation.isIgnoreIfExists()) {
-					throw new ValidationException(
-						String.format("Temporary catalog function %s is already defined",
-						createCatalogFunctionOperation.getFunctionIdentifier().asSerializableString()));
-				}
+						createCatalogFunctionOperation.isIgnoreIfExists());
 			} else {
 				Catalog catalog = getCatalogOrThrowException(
 					createCatalogFunctionOperation.getFunctionIdentifier().getCatalogName());
@@ -1389,18 +1381,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 	private TableResult createSystemFunction(CreateTempSystemFunctionOperation operation) {
 		String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
 		try {
-				boolean exist = functionCatalog.hasTemporarySystemFunction(operation.getFunctionName());
-				if (!exist) {
-					functionCatalog.registerTemporarySystemFunction(
-						operation.getFunctionName(),
-						operation.getFunctionClass(),
-						operation.getFunctionLanguage(),
-						false);
-				} else if (!operation.isIgnoreIfExists()) {
-					throw new ValidationException(
-						String.format("Temporary system function %s is already defined",
-							operation.getFunctionName()));
-				}
+			functionCatalog.registerTemporarySystemFunction(
+					operation.getFunctionName(),
+					operation.getFunctionClass(),
+					operation.getFunctionLanguage(),
+					operation.isIgnoreIfExists());
 			return TableResultImpl.TABLE_RESULT_OK;
 		} catch (ValidationException e) {
 			throw e;
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index c8929ac..f2a40ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -153,9 +153,8 @@ public class FunctionITCase extends StreamingTestBase {
 			tEnv().executeSql(ddl1);
 		} catch (Exception e) {
 			assertTrue(e instanceof ValidationException);
-			assertEquals(e.getMessage(),
-				"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
-					" is already defined");
+			assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
+					e.getMessage());
 		}
 
 		tEnv().executeSql(ddl3);
@@ -164,21 +163,21 @@ public class FunctionITCase extends StreamingTestBase {
 			tEnv().executeSql(ddl3);
 		} catch (Exception e) {
 			assertTrue(e instanceof ValidationException);
-			assertEquals(e.getMessage(),
-				"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
-					" doesn't exist");
+			assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
+					" doesn't exist",
+					e.getMessage());
 		}
 	}
 
 	@Test
 	public void testCreateTemporarySystemFunction() {
-		String ddl1 = "create temporary system function default_catalog.default_database.f5" +
+		String ddl1 = "create temporary system function f5" +
 			" as '" + TEST_FUNCTION + "'";
 
-		String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
+		String ddl2 = "create temporary system function if not exists f5" +
 			" as '" + TEST_FUNCTION + "'";
 
-		String ddl3 = "drop temporary system function default_catalog.default_database.f5";
+		String ddl3 = "drop temporary system function f5";
 
 		tEnv().executeSql(ddl1);
 		tEnv().executeSql(ddl2);
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e6ec3be..ccc2b55 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -294,11 +294,11 @@ class TableEnvironmentTest {
       .functionExists(ObjectPath.fromString("default_database.f1")))
 
     val tableResult4 = tableEnv.executeSql(
-      s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
+      s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
     assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
     assertTrue(tableEnv.listUserDefinedFunctions().contains("f2"))
 
-    val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
+    val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION f2")
     assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
     assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
   }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
index f466472..e1e8396 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
@@ -143,9 +143,8 @@ public class FunctionITCase extends AbstractTestBase {
 			tableEnv.sqlUpdate(ddl1);
 		} catch (Exception e) {
 			assertTrue(e instanceof ValidationException);
-			assertEquals(e.getMessage(),
-				"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
-					" is already defined");
+			assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
+					e.getMessage());
 		}
 
 		tableEnv.sqlUpdate(ddl3);
@@ -154,22 +153,22 @@ public class FunctionITCase extends AbstractTestBase {
 			tableEnv.sqlUpdate(ddl3);
 		} catch (Exception e) {
 			assertTrue(e instanceof ValidationException);
-			assertEquals(e.getMessage(),
-				"Temporary catalog function `default_catalog`.`default_database`.`f4`" +
-					" doesn't exist");
+			assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
+					" doesn't exist",
+					e.getMessage());
 		}
 	}
 
 	@Test
 	public void testCreateTemporarySystemFunction() {
 		TableEnvironment tableEnv = getTableEnvironment();
-		String ddl1 = "create temporary system function default_catalog.default_database.f5" +
+		String ddl1 = "create temporary system function f5" +
 			" as '" + TEST_FUNCTION + "'";
 
-		String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
-			" as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'";
+		String ddl2 = "create temporary system function if not exists f5" +
+			" as 'org.apache.flink.table.runtime.stream.sql.FunctionITCase$TestUDF'";
 
-		String ddl3 = "drop temporary system function default_catalog.default_database.f5";
+		String ddl3 = "drop temporary system function f5";
 
 		tableEnv.sqlUpdate(ddl1);
 		tableEnv.sqlUpdate(ddl2);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d56af89..65b34ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -176,12 +176,12 @@ class BatchTableEnvironmentTest extends TableTestBase {
       .functionExists(ObjectPath.fromString("default_database.f1")))
 
     val tableResult4 = util.tableEnv.executeSql(
-      s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
+      s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
     assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
     assertTrue(util.tableEnv.listUserDefinedFunctions().contains("f2"))
 
     val tableResult5 = util.tableEnv.executeSql(
-      "DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
+      "DROP TEMPORARY SYSTEM FUNCTION f2")
     assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
     assertFalse(util.tableEnv.listUserDefinedFunctions().contains("f2"))
   }