You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/05 02:32:45 UTC
[flink] branch release-1.11 updated: [FLINK-17717][sql-parser]
Throws for DDL create temporary system func with composite identifier
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new edfe844 [FLINK-17717][sql-parser] Throws for DDL create temporary system func with composite identifier
edfe844 is described below
commit edfe844db645b7dc1451503662ee86123473fb1e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Jun 5 10:26:47 2020 +0800
[FLINK-17717][sql-parser] Throws for DDL create temporary system func with composite identifier
This closes #12352
---
.../table/client/cli/SqlCommandParserTest.java | 4 +-
.../src/main/codegen/includes/parserImpls.ftl | 100 +++++++++++----------
.../flink/sql/parser/FlinkSqlParserImplTest.java | 11 ++-
.../table/api/internal/TableEnvironmentImpl.java | 29 ++----
.../planner/runtime/stream/sql/FunctionITCase.java | 17 ++--
.../flink/table/api/TableEnvironmentTest.scala | 4 +-
.../table/runtime/stream/sql/FunctionITCase.java | 19 ++--
.../api/batch/BatchTableEnvironmentTest.scala | 4 +-
8 files changed, 93 insertions(+), 95 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 9b7c2d0..e9e3bf6 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -206,9 +206,9 @@ public class SqlCommandParserTest {
TestItem.validSql("CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
SqlCommand.CREATE_FUNCTION,
"CREATE TEMPORARY FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
- TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA",
+ TestItem.validSql("CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA",
SqlCommand.CREATE_FUNCTION,
- "CREATE TEMPORARY SYSTEM FUNCTION catalog1.db1.func1 as 'class_name' LANGUAGE JAVA"),
+ "CREATE TEMPORARY SYSTEM FUNCTION func1 as 'class_name' LANGUAGE JAVA"),
// drop function xx
TestItem.invalidSql("DROP FUNCTION "),
TestItem.invalidSql("DROP FUNCTIONS "),
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ca07daa..249ba13 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -16,6 +16,36 @@
-->
/**
+ * Parses a "IF EXISTS" option, default is false.
+ */
+boolean IfExistsOpt() :
+{
+}
+{
+ (
+ LOOKAHEAD(2)
+ <IF> <EXISTS> { return true; }
+ |
+ { return false; }
+ )
+}
+
+/**
+ * Parses a "IF NOT EXISTS" option, default is false.
+ */
+boolean IfNotExistsOpt() :
+{
+}
+{
+ (
+ LOOKAHEAD(3)
+ <IF> <NOT> <EXISTS> { return true; }
+ |
+ { return false; }
+ )
+}
+
+/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
@@ -87,11 +117,7 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
{
<CATALOG>
- (
- <IF> <EXISTS> { ifExists = true; }
- |
- { ifExists = false; }
- )
+ ifExists = IfExistsOpt()
catalogName = CompoundIdentifier()
@@ -142,10 +168,9 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
}
{
<DATABASE> { startPos = getPos(); }
- [
- LOOKAHEAD(3)
- <IF> <NOT> <EXISTS> { ifNotExists = true; }
- ]
+
+ ifNotExists = IfNotExistsOpt()
+
databaseName = CompoundIdentifier()
[ <COMMENT> <QUOTED_STRING>
{
@@ -193,17 +218,13 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
{
<DATABASE>
- (
- <IF> <EXISTS> { ifExists = true; }
- |
- { ifExists = false; }
- )
+ ifExists = IfExistsOpt()
databaseName = CompoundIdentifier()
[
- <RESTRICT> { cascade = false; }
- |
- <CASCADE> { cascade = true; }
+ <RESTRICT> { cascade = false; }
+ |
+ <CASCADE> { cascade = true; }
]
{
@@ -236,18 +257,16 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
boolean isSystemFunction = false;
}
{
- [
- <SYSTEM> {isSystemFunction = true;}
- ]
-
- <FUNCTION>
-
- [
- LOOKAHEAD(3)
- <IF> <NOT> <EXISTS> { ifNotExists = true; }
- ]
-
- functionIdentifier = CompoundIdentifier()
+ (
+ <SYSTEM> <FUNCTION>
+ ifNotExists = IfNotExistsOpt()
+ functionIdentifier = SimpleIdentifier()
+ { isSystemFunction = true; }
+ |
+ <FUNCTION>
+ ifNotExists = IfNotExistsOpt()
+ functionIdentifier = CompoundIdentifier()
+ )
<AS> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
@@ -281,7 +300,7 @@ SqlDrop SqlDropFunction(Span s, boolean replace, boolean isTemporary) :
<FUNCTION>
- [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
+ ifExists = IfExistsOpt()
functionIdentifier = CompoundIdentifier()
@@ -309,7 +328,7 @@ SqlAlterFunction SqlAlterFunction() :
<FUNCTION> { startPos = getPos(); }
- [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
+ ifExists = IfExistsOpt()
functionIdentifier = CompoundIdentifier()
@@ -813,11 +832,7 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
{
<TABLE>
- (
- <IF> <EXISTS> { ifExists = true; }
- |
- { ifExists = false; }
- )
+ ifExists = IfExistsOpt()
tableName = CompoundIdentifier()
@@ -936,10 +951,8 @@ SqlCreate SqlCreateView(Span s, boolean replace, boolean isTemporary) : {
{
<VIEW>
- [
- LOOKAHEAD(3)
- <IF> <NOT> <EXISTS> { ifNotExists = true; }
- ]
+ ifNotExists = IfNotExistsOpt()
+
viewName = CompoundIdentifier()
[
fieldList = ParenthesizedSimpleIdentifierList()
@@ -964,11 +977,8 @@ SqlDrop SqlDropView(Span s, boolean replace, boolean isTemporary) :
{
<VIEW>
- (
- <IF> <EXISTS> { ifExists = true; }
- |
- { ifExists = false; }
- )
+ ifExists = IfExistsOpt()
+
viewName = CompoundIdentifier()
{
return new SqlDropView(s.pos(), viewName, ifExists, isTemporary);
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index b319f47..7b01d43 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -947,9 +947,6 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
.ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
- .ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
-
sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'")
.ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
@@ -964,6 +961,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
sql("create temporary system function function1 as 'org.apache.fink.function.function1' language scala")
.ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+
+ // Temporary system function always belongs to the system and current session.
+ sql("create temporary system function catalog1^.^db1.function1 as 'org.apache.fink.function.function1'")
+ .fails("(?s).*Encountered \".\" at.*");
+
+ // TODO: FLINK-17957: Forbidden syntax "CREATE SYSTEM FUNCTION" for sql parser
+ sql("create system function function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
}
@Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 82f83f1..3d48655 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1306,18 +1306,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
String exMsg = getDDLOpExecuteErrorMsg(createCatalogFunctionOperation.asSummaryString());
try {
if (createCatalogFunctionOperation.isTemporary()) {
- boolean exist = functionCatalog.hasTemporaryCatalogFunction(
- createCatalogFunctionOperation.getFunctionIdentifier());
- if (!exist) {
- functionCatalog.registerTemporaryCatalogFunction(
+ functionCatalog.registerTemporaryCatalogFunction(
UnresolvedIdentifier.of(createCatalogFunctionOperation.getFunctionIdentifier().toList()),
createCatalogFunctionOperation.getCatalogFunction(),
- false);
- } else if (!createCatalogFunctionOperation.isIgnoreIfExists()) {
- throw new ValidationException(
- String.format("Temporary catalog function %s is already defined",
- createCatalogFunctionOperation.getFunctionIdentifier().asSerializableString()));
- }
+ createCatalogFunctionOperation.isIgnoreIfExists());
} else {
Catalog catalog = getCatalogOrThrowException(
createCatalogFunctionOperation.getFunctionIdentifier().getCatalogName());
@@ -1389,18 +1381,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
private TableResult createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
- boolean exist = functionCatalog.hasTemporarySystemFunction(operation.getFunctionName());
- if (!exist) {
- functionCatalog.registerTemporarySystemFunction(
- operation.getFunctionName(),
- operation.getFunctionClass(),
- operation.getFunctionLanguage(),
- false);
- } else if (!operation.isIgnoreIfExists()) {
- throw new ValidationException(
- String.format("Temporary system function %s is already defined",
- operation.getFunctionName()));
- }
+ functionCatalog.registerTemporarySystemFunction(
+ operation.getFunctionName(),
+ operation.getFunctionClass(),
+ operation.getFunctionLanguage(),
+ operation.isIgnoreIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index c8929ac..f2a40ac 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -153,9 +153,8 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(ddl1);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
- assertEquals(e.getMessage(),
- "Temporary catalog function `default_catalog`.`default_database`.`f4`" +
- " is already defined");
+ assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
+ e.getMessage());
}
tEnv().executeSql(ddl3);
@@ -164,21 +163,21 @@ public class FunctionITCase extends StreamingTestBase {
tEnv().executeSql(ddl3);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
- assertEquals(e.getMessage(),
- "Temporary catalog function `default_catalog`.`default_database`.`f4`" +
- " doesn't exist");
+ assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
+ " doesn't exist",
+ e.getMessage());
}
}
@Test
public void testCreateTemporarySystemFunction() {
- String ddl1 = "create temporary system function default_catalog.default_database.f5" +
+ String ddl1 = "create temporary system function f5" +
" as '" + TEST_FUNCTION + "'";
- String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
+ String ddl2 = "create temporary system function if not exists f5" +
" as '" + TEST_FUNCTION + "'";
- String ddl3 = "drop temporary system function default_catalog.default_database.f5";
+ String ddl3 = "drop temporary system function f5";
tEnv().executeSql(ddl1);
tEnv().executeSql(ddl2);
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index e6ec3be..ccc2b55 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -294,11 +294,11 @@ class TableEnvironmentTest {
.functionExists(ObjectPath.fromString("default_database.f1")))
val tableResult4 = tableEnv.executeSql(
- s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
+ s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
assertTrue(tableEnv.listUserDefinedFunctions().contains("f2"))
- val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
+ val tableResult5 = tableEnv.executeSql("DROP TEMPORARY SYSTEM FUNCTION f2")
assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
assertFalse(tableEnv.listUserDefinedFunctions().contains("f2"))
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
index f466472..e1e8396 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
@@ -143,9 +143,8 @@ public class FunctionITCase extends AbstractTestBase {
tableEnv.sqlUpdate(ddl1);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
- assertEquals(e.getMessage(),
- "Temporary catalog function `default_catalog`.`default_database`.`f4`" +
- " is already defined");
+ assertEquals("Could not register temporary catalog function. A function 'default_catalog.default_database.f4' does already exist.",
+ e.getMessage());
}
tableEnv.sqlUpdate(ddl3);
@@ -154,22 +153,22 @@ public class FunctionITCase extends AbstractTestBase {
tableEnv.sqlUpdate(ddl3);
} catch (Exception e) {
assertTrue(e instanceof ValidationException);
- assertEquals(e.getMessage(),
- "Temporary catalog function `default_catalog`.`default_database`.`f4`" +
- " doesn't exist");
+ assertEquals("Temporary catalog function `default_catalog`.`default_database`.`f4`" +
+ " doesn't exist",
+ e.getMessage());
}
}
@Test
public void testCreateTemporarySystemFunction() {
TableEnvironment tableEnv = getTableEnvironment();
- String ddl1 = "create temporary system function default_catalog.default_database.f5" +
+ String ddl1 = "create temporary system function f5" +
" as '" + TEST_FUNCTION + "'";
- String ddl2 = "create temporary system function if not exists default_catalog.default_database.f5" +
- " as 'org.apache.flink.table.functions.CatalogFunctionTestBase$TestUDF'";
+ String ddl2 = "create temporary system function if not exists f5" +
+ " as 'org.apache.flink.table.runtime.stream.sql.FunctionITCase$TestUDF'";
- String ddl3 = "drop temporary system function default_catalog.default_database.f5";
+ String ddl3 = "drop temporary system function f5";
tableEnv.sqlUpdate(ddl1);
tableEnv.sqlUpdate(ddl2);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d56af89..65b34ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -176,12 +176,12 @@ class BatchTableEnvironmentTest extends TableTestBase {
.functionExists(ObjectPath.fromString("default_database.f1")))
val tableResult4 = util.tableEnv.executeSql(
- s"CREATE TEMPORARY SYSTEM FUNCTION default_database.f2 AS '$funcName'")
+ s"CREATE TEMPORARY SYSTEM FUNCTION f2 AS '$funcName'")
assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)
assertTrue(util.tableEnv.listUserDefinedFunctions().contains("f2"))
val tableResult5 = util.tableEnv.executeSql(
- "DROP TEMPORARY SYSTEM FUNCTION default_database.f2")
+ "DROP TEMPORARY SYSTEM FUNCTION f2")
assertEquals(ResultKind.SUCCESS, tableResult5.getResultKind)
assertFalse(util.tableEnv.listUserDefinedFunctions().contains("f2"))
}