You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/03/31 02:15:16 UTC
[flink] 02/13: [FLINK-14338][sql-parser] Bump sql parser Calcite
dependency to 1.22.0
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78499853bff8eccc2b174a883a7cfdc4125eb499
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Tue Mar 17 19:00:55 2020 +0800
[FLINK-14338][sql-parser] Bump sql parser Calcite dependency to 1.22.0
* Add Junit5 supports for tests
* Move classes under package calcite.sql to flink.sql.parser.type
* Remove ExtendedSqlBasicTypeNameSpec, use SqlAlienSystemTypeNameSpec instead
* In Parser.tdd, re-organize the imports order to alphabetical order, remove the useless tailing commas, extends nonReservedKeywordsToAdd
* Fix the JavaCC compile warnings
---
.../src/main/codegen/data/Parser.tdd | 201 ++++++-----
.../src/main/codegen/includes/parserImpls.ftl | 19 +-
.../java/org/apache/calcite/sql/package-info.java | 27 --
.../flink/sql/parser/ddl/SqlCreateTable.java | 18 +-
.../parser/type/ExtendedSqlBasicTypeNameSpec.java | 57 ---
.../parser/type}/ExtendedSqlRowTypeNameSpec.java | 8 +-
.../sql/parser/type}/SqlMapTypeNameSpec.java | 6 +-
.../sql/parser/validate/FlinkSqlConformance.java | 10 +
.../flink/sql/parser/FlinkDDLDataTypeTest.java | 4 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 392 +++++++++++----------
10 files changed, 365 insertions(+), 377 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 1783a5e..14158c7 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -21,69 +21,70 @@
# List of additional classes and packages to import.
# Example. "org.apache.calcite.sql.*", "java.util.List".
+ # Please keep the import classes in alphabetical order if new class is added.
imports: [
- "org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec",
- "org.apache.calcite.sql.SqlMapTypeNameSpec",
- "org.apache.flink.sql.parser.ddl.SqlCreateTable",
+ "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
+ "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
+ "org.apache.flink.sql.parser.ddl.SqlAlterTable"
+ "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties"
+ "org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
+ "org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
+ "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
+ "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
+ "org.apache.flink.sql.parser.ddl.SqlCreateTable"
+ "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
+ "org.apache.flink.sql.parser.ddl.SqlCreateView"
+ "org.apache.flink.sql.parser.ddl.SqlDropDatabase"
+ "org.apache.flink.sql.parser.ddl.SqlDropFunction"
"org.apache.flink.sql.parser.ddl.SqlDropTable"
- "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext",
- "org.apache.flink.sql.parser.ddl.SqlCreateView",
- "org.apache.flink.sql.parser.ddl.SqlDropView",
- "org.apache.flink.sql.parser.ddl.SqlTableColumn",
- "org.apache.flink.sql.parser.ddl.SqlTableOption",
- "org.apache.flink.sql.parser.ddl.SqlWatermark",
- "org.apache.flink.sql.parser.ddl.SqlUseCatalog",
- "org.apache.flink.sql.parser.ddl.SqlUseDatabase",
- "org.apache.flink.sql.parser.ddl.SqlCreateDatabase",
- "org.apache.flink.sql.parser.ddl.SqlDropDatabase",
- "org.apache.flink.sql.parser.ddl.SqlAlterDatabase",
- "org.apache.flink.sql.parser.ddl.SqlAlterFunction",
- "org.apache.flink.sql.parser.ddl.SqlCreateFunction",
- "org.apache.flink.sql.parser.ddl.SqlDropFunction",
- "org.apache.flink.sql.parser.ddl.SqlAlterTable",
- "org.apache.flink.sql.parser.ddl.SqlAlterTableRename",
- "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties",
- "org.apache.flink.sql.parser.ddl.SqlCreateCatalog",
- "org.apache.flink.sql.parser.dml.RichSqlInsert",
- "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
- "org.apache.flink.sql.parser.dql.SqlShowCatalogs",
- "org.apache.flink.sql.parser.dql.SqlDescribeCatalog",
- "org.apache.flink.sql.parser.dql.SqlShowDatabases",
- "org.apache.flink.sql.parser.dql.SqlShowFunctions",
- "org.apache.flink.sql.parser.dql.SqlDescribeDatabase",
- "org.apache.flink.sql.parser.dql.SqlShowTables",
- "org.apache.flink.sql.parser.dql.SqlRichDescribeTable",
- "org.apache.flink.sql.parser.type.ExtendedSqlBasicTypeNameSpec",
- "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec",
- "org.apache.flink.sql.parser.utils.SqlTimeUnit",
- "org.apache.flink.sql.parser.utils.ParserResource",
- "org.apache.flink.sql.parser.validate.FlinkSqlConformance",
- "org.apache.flink.sql.parser.SqlProperty",
- "org.apache.calcite.sql.SqlDrop",
- "org.apache.calcite.sql.SqlCreate",
- "java.util.List",
+ "org.apache.flink.sql.parser.ddl.SqlDropView"
+ "org.apache.flink.sql.parser.ddl.SqlTableColumn"
+ "org.apache.flink.sql.parser.ddl.SqlTableOption"
+ "org.apache.flink.sql.parser.ddl.SqlUseCatalog"
+ "org.apache.flink.sql.parser.ddl.SqlUseDatabase"
+ "org.apache.flink.sql.parser.ddl.SqlWatermark"
+ "org.apache.flink.sql.parser.dml.RichSqlInsert"
+ "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
+ "org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
+ "org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
+ "org.apache.flink.sql.parser.dql.SqlShowCatalogs"
+ "org.apache.flink.sql.parser.dql.SqlShowDatabases"
+ "org.apache.flink.sql.parser.dql.SqlShowFunctions"
+ "org.apache.flink.sql.parser.dql.SqlShowTables"
+ "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
+ "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
+ "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
+ "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
+ "org.apache.flink.sql.parser.utils.SqlTimeUnit"
+ "org.apache.flink.sql.parser.utils.ParserResource"
+ "org.apache.flink.sql.parser.validate.FlinkSqlConformance"
+ "org.apache.flink.sql.parser.SqlProperty"
+ "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
+ "org.apache.calcite.sql.SqlCreate"
+ "org.apache.calcite.sql.SqlDrop"
+ "java.util.List"
"java.util.ArrayList"
]
# List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved
# keyword, please also add it to 'nonReservedKeywords' section.
keywords: [
- "COMMENT",
- "PARTITIONED",
- "IF",
- "WATERMARK",
- "OVERWRITE",
- "STRING",
- "BYTES",
- "RAW",
- "CATALOGS",
- "USE",
- "DATABASES",
- "FUNCTIONS",
- "EXTENDED",
- "SCALA",
- "TABLES",
+ "BYTES"
+ "CATALOGS"
+ "COMMENT"
+ "DATABASES"
+ "EXTENDED"
+ "FUNCTIONS"
+ "IF"
+ "OVERWRITE"
+ "PARTITIONED"
+ "RAW"
"RENAME"
+ "SCALA"
+ "STRING"
+ "TABLES"
+ "USE"
+ "WATERMARK"
]
# List of keywords from "keywords" section that are not reserved.
@@ -109,15 +110,14 @@
"C"
"CASCADE"
"CATALOG"
- "CATALOGS"
"CATALOG_NAME"
"CENTURY"
"CHAIN"
+ "CHARACTERISTICS"
+ "CHARACTERS"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
"CHARACTER_SET_SCHEMA"
- "CHARACTERISTICS"
- "CHARACTERS"
"CLASS_ORIGIN"
"COBOL"
"COLLATION"
@@ -128,22 +128,22 @@
"COMMAND_FUNCTION"
"COMMAND_FUNCTION_CODE"
"COMMITTED"
- "CONDITION_NUMBER"
"CONDITIONAL"
+ "CONDITION_NUMBER"
"CONNECTION"
"CONNECTION_NAME"
"CONSTRAINT_CATALOG"
"CONSTRAINT_NAME"
- "CONSTRAINT_SCHEMA"
"CONSTRAINTS"
+ "CONSTRAINT_SCHEMA"
"CONSTRUCTOR"
"CONTINUE"
"CURSOR_NAME"
"DATA"
"DATABASE"
- "DATABASES"
"DATETIME_INTERVAL_CODE"
"DATETIME_INTERVAL_PRECISION"
+ "DAYS"
"DECADE"
"DEFAULTS"
"DEFERRABLE"
@@ -169,7 +169,6 @@
"EXCEPTION"
"EXCLUDE"
"EXCLUDING"
- "EXTENDED"
"FINAL"
"FIRST"
"FOLLOWING"
@@ -177,7 +176,6 @@
"FORTRAN"
"FOUND"
"FRAC_SECOND"
- "FUNCTIONS"
"G"
"GENERAL"
"GENERATED"
@@ -186,6 +184,7 @@
"GOTO"
"GRANTED"
"HIERARCHY"
+ "HOURS"
"IGNORE"
"IMMEDIATE"
"IMMEDIATELY"
@@ -198,8 +197,8 @@
"INSTANTIABLE"
"INVOKER"
"ISODOW"
- "ISOYEAR"
"ISOLATION"
+ "ISOYEAR"
"JAVA"
"JSON"
"K"
@@ -213,15 +212,18 @@
"LIBRARY"
"LOCATOR"
"M"
+ "MAP"
"MATCHED"
"MAXVALUE"
- "MICROSECOND"
"MESSAGE_LENGTH"
"MESSAGE_OCTET_LENGTH"
"MESSAGE_TEXT"
- "MILLISECOND"
+ "MICROSECOND"
"MILLENNIUM"
+ "MILLISECOND"
+ "MINUTES"
"MINVALUE"
+ "MONTHS"
"MORE_"
"MUMPS"
"NAME"
@@ -265,7 +267,6 @@
"QUARTER"
"READ"
"RELATIVE"
- "RENAME"
"REPEATABLE"
"REPLACE"
"RESPECT"
@@ -289,6 +290,7 @@
"SCOPE_CATALOGS"
"SCOPE_NAME"
"SCOPE_SCHEMA"
+ "SECONDS"
"SECTION"
"SECURITY"
"SELF"
@@ -329,8 +331,8 @@
"SQL_INTERVAL_YEAR"
"SQL_INTERVAL_YEAR_TO_MONTH"
"SQL_LONGVARBINARY"
- "SQL_LONGVARNCHAR"
"SQL_LONGVARCHAR"
+ "SQL_LONGVARNCHAR"
"SQL_NCHAR"
"SQL_NCLOB"
"SQL_NUMERIC"
@@ -358,7 +360,6 @@
"STYLE"
"SUBCLASS_ORIGIN"
"SUBSTITUTE"
- "TABLES"
"TABLE_NAME"
"TEMPORARY"
"TIES"
@@ -374,6 +375,7 @@
"TRIGGER_CATALOG"
"TRIGGER_NAME"
"TRIGGER_SCHEMA"
+ "TUMBLE"
"TYPE"
"UNBOUNDED"
"UNCOMMITTED"
@@ -381,45 +383,54 @@
"UNDER"
"UNNAMED"
"USAGE"
- "USE"
"USER_DEFINED_TYPE_CATALOG"
"USER_DEFINED_TYPE_CODE"
"USER_DEFINED_TYPE_NAME"
"USER_DEFINED_TYPE_SCHEMA"
- "UTF8"
"UTF16"
"UTF32"
+ "UTF8"
"VERSION"
"VIEW"
"WEEK"
- "WRAPPER"
"WORK"
+ "WRAPPER"
"WRITE"
"XML"
- "ZONE",
+ "YEARS"
+ "ZONE"
+ ]
+ # List of non-reserved keywords to add;
+ # items in this list become non-reserved
+ nonReservedKeywordsToAdd: [
# not in core, added in Flink
- "PARTITIONED",
- "IF",
+ "IF"
"OVERWRITE"
+ "PARTITIONED"
+ ]
+
+ # List of non-reserved keywords to remove;
+ # items in this list become reserved
+ nonReservedKeywordsToRemove: [
]
# List of methods for parsing custom SQL statements.
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
- "RichSqlInsert()",
- "SqlShowCatalogs()",
- "SqlDescribeCatalog()",
- "SqlUseCatalog()",
- "SqlShowDatabases()",
- "SqlUseDatabase()",
- "SqlAlterDatabase()",
- "SqlDescribeDatabase()",
- "SqlAlterFunction()",
- "SqlShowFunctions()",
- "SqlShowTables()",
- "SqlRichDescribeTable()",
+ "RichSqlInsert()"
+ "SqlShowCatalogs()"
+ "SqlDescribeCatalog()"
+ "SqlUseCatalog()"
+ "SqlShowDatabases()"
+ "SqlUseDatabase()"
+ "SqlAlterDatabase()"
+ "SqlDescribeDatabase()"
+ "SqlAlterFunction()"
+ "SqlShowFunctions()"
+ "SqlShowTables()"
+ "SqlRichDescribeTable()"
"SqlAlterTable()"
]
@@ -433,9 +444,9 @@
# Return type of method implementation should be "SqlTypeNameSpec".
# Example: SqlParseTimeStampZ().
dataTypeParserMethods: [
- "ExtendedSqlBasicTypeName()",
- "CustomizedCollectionsTypeName()",
- "SqlMapTypeName()",
+ "ExtendedSqlBasicTypeName()"
+ "CustomizedCollectionsTypeName()"
+ "SqlMapTypeName()"
"ExtendedSqlRowTypeName()"
]
@@ -454,18 +465,18 @@
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
- "SqlCreateCatalog",
- "SqlCreateTable",
- "SqlCreateView",
- "SqlCreateDatabase",
+ "SqlCreateCatalog"
+ "SqlCreateTable"
+ "SqlCreateView"
+ "SqlCreateDatabase"
"SqlCreateFunction"
]
# List of methods for parsing extensions to "DROP" calls.
# Each must accept arguments "(Span s)".
dropStatementParserMethods: [
- "SqlDropTable",
- "SqlDropView",
+ "SqlDropTable"
+ "SqlDropView"
"SqlDropDatabase"
"SqlDropFunction"
]
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index a93ef66..96a25a4 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -119,7 +119,10 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) :
}
{
<DATABASE> { startPos = getPos(); }
- [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+ [
+ LOOKAHEAD(3)
+ <IF> <NOT> <EXISTS> { ifNotExists = true; }
+ ]
databaseName = CompoundIdentifier()
[ <COMMENT> <QUOTED_STRING>
{
@@ -217,7 +220,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace) :
<FUNCTION>
- [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]
+ [
+ LOOKAHEAD(3)
+ <IF> <NOT> <EXISTS> { ifNotExists = true; }
+ ]
functionIdentifier = CompoundIdentifier()
@@ -248,12 +254,13 @@ SqlDrop SqlDropFunction(Span s, boolean replace) :
boolean isSystemFunction = false;
}
{
- [ <TEMPORARY> {isTemporary = true;}
+ [
+ <TEMPORARY> {isTemporary = true;}
[ <SYSTEM> { isSystemFunction = true; } ]
]
<FUNCTION>
- [ <IF> <EXISTS> { ifExists = true; } ]
+ [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
functionIdentifier = CompoundIdentifier()
@@ -281,7 +288,7 @@ SqlAlterFunction SqlAlterFunction() :
<FUNCTION> { startPos = getPos(); }
- [ <IF> <EXISTS> { ifExists = true; } ]
+ [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ]
functionIdentifier = CompoundIdentifier()
@@ -806,7 +813,7 @@ SqlTypeNameSpec ExtendedSqlBasicTypeName() :
}
)
{
- return new ExtendedSqlBasicTypeNameSpec(typeAlias, typeName, precision, getPos());
+ return new SqlAlienSystemTypeNameSpec(typeAlias, typeName, precision, getPos());
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java
deleted file mode 100644
index 439912b..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package is needed because the constructor of SqlTypeNameSpec is package scope,
- * we should merge this package into org.apache.flink.sql.parser.type when CALCITE-3360
- * is resolved.
- */
-@PackageMarker
-package org.apache.calcite.sql;
-
-import org.apache.calcite.avatica.util.PackageMarker;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index c4c7f11..19839a0 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.sql.parser.ddl;
import org.apache.flink.sql.parser.ExtendedSqlNode;
import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec;
-import org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
@@ -69,10 +69,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
private final SqlNodeList partitionKeyList;
- @Nullable
private final SqlWatermark watermark;
- @Nullable
private final SqlCharStringLiteral comment;
public SqlCreateTable(
@@ -83,8 +81,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
List<SqlNodeList> uniqueKeysList,
SqlNodeList propertyList,
SqlNodeList partitionKeyList,
- SqlWatermark watermark,
- SqlCharStringLiteral comment) {
+ @Nullable SqlWatermark watermark,
+ @Nullable SqlCharStringLiteral comment) {
super(OPERATOR, pos, false, false);
this.tableName = requireNonNull(tableName, "tableName should not be null");
this.columnList = requireNonNull(columnList, "columnList should not be null");
@@ -219,10 +217,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
* have been reversed.
*/
public String getColumnSqlString() {
- SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT);
- writer.setAlwaysUseParentheses(true);
- writer.setSelectListItemsOnSeparateLines(false);
- writer.setIndentation(0);
+ SqlPrettyWriter writer = new SqlPrettyWriter(
+ SqlPrettyWriter.config()
+ .withDialect(AnsiSqlDialect.DEFAULT)
+ .withAlwaysUseParentheses(true)
+ .withSelectListItemsOnSeparateLines(false)
+ .withIndentation(0));
writer.startList("", "");
for (SqlNode column : columnList) {
writer.sep(",");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java
deleted file mode 100644
index ec81a0d..0000000
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.sql.parser.type;
-
-import org.apache.calcite.sql.SqlBasicTypeNameSpec;
-import org.apache.calcite.sql.SqlWriter;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A sql type name specification of extended basic data type, it has a counterpart basic
- * sql type name but always represents as a special alias in Flink.
- *
- * <p>For example, STRING is synonym of VARCHAR(INT_MAX)
- * and BYTES is synonym of VARBINARY(INT_MAX).
- */
-public class ExtendedSqlBasicTypeNameSpec extends SqlBasicTypeNameSpec {
- // Type alias used for unparsing.
- private final String typeAlias;
-
- /**
- * Creates a {@code ExtendedSqlBuiltinTypeNameSpec} instance.
- *
- * @param typeName type name
- * @param precision type precision
- * @param pos parser position
- */
- public ExtendedSqlBasicTypeNameSpec(
- String typeAlias,
- SqlTypeName typeName,
- int precision,
- SqlParserPos pos) {
- super(typeName, precision, pos);
- this.typeAlias = typeAlias;
- }
-
- @Override
- public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword(typeAlias);
- }
-}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
similarity index 93%
rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java
rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
index 42828d8..37e841b 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.calcite.sql;
+package org.apache.flink.sql.parser.type;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlRowTypeNameSpec;
+import org.apache.calcite.sql.SqlTypeNameSpec;
+import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
similarity index 93%
rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java
rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
index 0e22dbf..5725456 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java
@@ -16,9 +16,13 @@
* limitations under the License.
*/
-package org.apache.calcite.sql;
+package org.apache.flink.sql.parser.type;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlTypeNameSpec;
+import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
index d53b050..cf32026 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java
@@ -146,6 +146,16 @@ public enum FlinkSqlConformance implements SqlConformance {
return false;
}
+ @Override
+ public boolean allowPluralTimeUnits() {
+ return false;
+ }
+
+ @Override
+ public boolean allowQualifyingCommonColumn() {
+ return true;
+ }
+
/**
* Whether to allow "create table T(i int, j int) partitioned by (i)" grammar.
*/
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
index f5edc4f..5e0e0e5 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -44,6 +44,7 @@ import org.apache.calcite.sql.parser.SqlParserTest;
import org.apache.calcite.sql.parser.SqlParserUtil;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.sql.test.SqlTestFactory;
+import org.apache.calcite.sql.test.SqlTests;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
@@ -51,7 +52,6 @@ import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.test.MockSqlOperatorTable;
-import org.apache.calcite.test.SqlValidatorTestCase;
import org.apache.calcite.test.catalog.MockCatalogReaderSimple;
import org.apache.calcite.util.SourceStringReader;
import org.apache.calcite.util.Util;
@@ -439,7 +439,7 @@ public class FlinkDDLDataTypeTest {
private void checkEx(String expectedMsgPattern,
SqlParserUtil.StringAndPos sap,
Throwable thrown) {
- SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap);
+ SqlTests.checkEx(thrown, expectedMsgPattern, sap, SqlTests.Stage.VALIDATE);
}
}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 3057324..0509e91 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.Reader;
+import java.util.function.UnaryOperator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -51,9 +52,10 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
return FlinkSqlParserImpl.FACTORY;
}
- protected SqlParser getSqlParser(Reader source) {
+ protected SqlParser getSqlParser(Reader source,
+ UnaryOperator<SqlParser.ConfigBuilder> transform) {
if (conformance0 == null) {
- return super.getSqlParser(source);
+ return super.getSqlParser(source, transform);
} else {
// overwrite the default sql conformance.
return SqlParser.create(source,
@@ -75,12 +77,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testShowCatalogs() {
- check("show catalogs", "SHOW CATALOGS");
+ sql("show catalogs").ok("SHOW CATALOGS");
}
@Test
public void testDescribeCatalog() {
- check("describe catalog a", "DESCRIBE CATALOG `A`");
+ sql("describe catalog a").ok("DESCRIBE CATALOG `A`");
}
/**
@@ -92,7 +94,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testUseCatalog() {
- check("use catalog a", "USE CATALOG `A`");
+ sql("use catalog a").ok("USE CATALOG `A`");
}
@Test
@@ -112,91 +114,94 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testShowDataBases() {
- check("show databases", "SHOW DATABASES");
+ sql("show databases").ok("SHOW DATABASES");
}
@Test
public void testUseDataBase() {
- check("use default_db", "USE `DEFAULT_DB`");
- check("use defaultCatalog.default_db", "USE `DEFAULTCATALOG`.`DEFAULT_DB`");
+ sql("use default_db").ok("USE `DEFAULT_DB`");
+ sql("use defaultCatalog.default_db").ok("USE `DEFAULTCATALOG`.`DEFAULT_DB`");
}
@Test
public void testCreateDatabase() {
- check("create database db1", "CREATE DATABASE `DB1`");
- check("create database if not exists db1", "CREATE DATABASE IF NOT EXISTS `DB1`");
- check("create database catalog1.db1", "CREATE DATABASE `CATALOG1`.`DB1`");
- check("create database db1 comment 'test create database'",
- "CREATE DATABASE `DB1`\n" +
- "COMMENT 'test create database'");
- check("create database db1 comment 'test create database'" +
- "with ( 'key1' = 'value1', 'key2.a' = 'value2.a')",
- "CREATE DATABASE `DB1`\n" +
- "COMMENT 'test create database' WITH (\n" +
- " 'key1' = 'value1',\n" +
- " 'key2.a' = 'value2.a'\n" +
- ")");
+ sql("create database db1").ok("CREATE DATABASE `DB1`");
+ sql("create database if not exists db1").ok("CREATE DATABASE IF NOT EXISTS `DB1`");
+ sql("create database catalog1.db1").ok("CREATE DATABASE `CATALOG1`.`DB1`");
+ final String sql = "create database db1 comment 'test create database'";
+ final String expected = "CREATE DATABASE `DB1`\n"
+ + "COMMENT 'test create database'";
+ sql(sql).ok(expected);
+ final String sql1 = "create database db1 comment 'test create database'"
+ + "with ( 'key1' = 'value1', 'key2.a' = 'value2.a')";
+ final String expected1 = "CREATE DATABASE `DB1`\n"
+ + "COMMENT 'test create database' WITH (\n"
+ + " 'key1' = 'value1',\n"
+ + " 'key2.a' = 'value2.a'\n"
+ + ")";
+ sql(sql1).ok(expected1);
}
@Test
public void testDropDatabase() {
- check("drop database db1", "DROP DATABASE `DB1` RESTRICT");
- check("drop database catalog1.db1", "DROP DATABASE `CATALOG1`.`DB1` RESTRICT");
- check("drop database db1 RESTRICT", "DROP DATABASE `DB1` RESTRICT");
- check("drop database db1 CASCADE", "DROP DATABASE `DB1` CASCADE");
+ sql("drop database db1").ok("DROP DATABASE `DB1` RESTRICT");
+ sql("drop database catalog1.db1").ok("DROP DATABASE `CATALOG1`.`DB1` RESTRICT");
+ sql("drop database db1 RESTRICT").ok("DROP DATABASE `DB1` RESTRICT");
+ sql("drop database db1 CASCADE").ok("DROP DATABASE `DB1` CASCADE");
}
@Test
public void testAlterDatabase() {
- check("alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')",
- "ALTER DATABASE `DB1` SET (\n" +
- " 'key1' = 'value1',\n" +
- " 'key2.a' = 'value2.a'\n" +
- ")");
+ final String sql = "alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')";
+ final String expected = "ALTER DATABASE `DB1` SET (\n"
+ + " 'key1' = 'value1',\n"
+ + " 'key2.a' = 'value2.a'\n"
+ + ")";
+ sql(sql).ok(expected);
}
@Test
public void testDescribeDatabase() {
- check("describe database db1", "DESCRIBE DATABASE `DB1`");
- check("describe database catlog1.db1", "DESCRIBE DATABASE `CATLOG1`.`DB1`");
- check("describe database extended db1", "DESCRIBE DATABASE EXTENDED `DB1`");
+ sql("describe database db1").ok("DESCRIBE DATABASE `DB1`");
+ sql("describe database catlog1.db1").ok("DESCRIBE DATABASE `CATLOG1`.`DB1`");
+ sql("describe database extended db1").ok("DESCRIBE DATABASE EXTENDED `DB1`");
}
@Test
public void testAlterFunction() {
- check("alter function function1 as 'org.apache.fink.function.function1'",
- "ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("alter function function1 as 'org.apache.fink.function.function1'")
+ .ok("ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("alter temporary function function1 as 'org.apache.fink.function.function1'",
- "ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("alter temporary function function1 as 'org.apache.fink.function.function1'")
+ .ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("alter temporary function function1 as 'org.apache.fink.function.function1' language scala",
- "ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+ sql("alter temporary function function1 as 'org.apache.fink.function.function1' language scala")
+ .ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
- check ("alter temporary system function function1 as 'org.apache.fink.function.function1'",
- "ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("alter temporary system function function1 as 'org.apache.fink.function.function1'")
+ .ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("alter temporary system function function1 as 'org.apache.fink.function.function1' language java",
- "ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
+ sql("alter temporary system function function1 as 'org.apache.fink.function.function1' language java")
+ .ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
}
@Test
public void testShowFuntions() {
- check("show functions", "SHOW FUNCTIONS");
- check("show functions db1", "SHOW FUNCTIONS `DB1`");
- check("show functions catalog1.db1", "SHOW FUNCTIONS `CATALOG1`.`DB1`");
+ sql("show functions").ok("SHOW FUNCTIONS");
+ sql("show functions db1").ok("SHOW FUNCTIONS `DB1`");
+ sql("show functions catalog1.db1").ok("SHOW FUNCTIONS `CATALOG1`.`DB1`");
}
@Test
public void testShowTables() {
- check("show tables", "SHOW TABLES");
+ sql("show tables").ok("SHOW TABLES");
}
@Test
public void testDescribeTable() {
- check("describe tbl", "DESCRIBE `TBL`");
- check("describe catlog1.db1.tbl", "DESCRIBE `CATLOG1`.`DB1`.`TBL`");
- check("describe extended db1", "DESCRIBE EXTENDED `DB1`");
+ sql("describe tbl").ok("DESCRIBE `TBL`");
+ sql("describe catlog1.db1.tbl").ok("DESCRIBE `CATLOG1`.`DB1`.`TBL`");
+ sql("describe extended db1").ok("DESCRIBE EXTENDED `DB1`");
}
/**
@@ -208,18 +213,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTable() {
- check("alter table t1 rename to t2", "ALTER TABLE `T1` RENAME TO `T2`");
- check("alter table c1.d1.t1 rename to t2", "ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
- check("alter table t1 set ('key1'='value1')",
- "ALTER TABLE `T1` SET (\n" +
- " 'key1' = 'value1'\n" +
- ")");
+ sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
+ sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
+ final String sql = "alter table t1 set ('key1'='value1')";
+ final String expected = "ALTER TABLE `T1` SET (\n"
+ + " 'key1' = 'value1'\n"
+ + ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTable() {
conformance0 = FlinkSqlConformance.HIVE;
- check("CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
" h varchar, \n" +
" g as 2 * (a + 1), \n" +
@@ -232,8 +238,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" with (\n" +
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
- ")\n",
- "CREATE TABLE `TBL1` (\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `A` BIGINT,\n" +
" `H` VARCHAR,\n" +
" `G` AS (2 * (`A` + 1)),\n" +
@@ -246,13 +252,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithComment() {
conformance0 = FlinkSqlConformance.HIVE;
- check("CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment AAA.',\n" +
" h varchar, \n" +
" g as 2 * (a + 1), \n" +
@@ -266,8 +273,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" with (\n" +
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
- ")\n",
- "CREATE TABLE `TBL1` (\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `A` BIGINT COMMENT 'test column comment AAA.',\n" +
" `H` VARCHAR,\n" +
" `G` AS (2 * (`A` + 1)),\n" +
@@ -281,13 +288,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithPrimaryKeyAndUniqueKey() {
conformance0 = FlinkSqlConformance.HIVE;
- check("CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint comment 'test column comment AAA.',\n" +
" h varchar, \n" +
" g as 2 * (a + 1), \n" +
@@ -302,8 +310,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" with (\n" +
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
- ")\n",
- "CREATE TABLE `TBL1` (\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `A` BIGINT COMMENT 'test column comment AAA.',\n" +
" `H` VARCHAR,\n" +
" `G` AS (2 * (`A` + 1)),\n" +
@@ -318,12 +326,13 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithWatermark() {
- String sql = "CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" ts timestamp(3),\n" +
" id varchar, \n" +
" watermark FOR ts AS ts - interval '3' second\n" +
@@ -332,20 +341,20 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
- check(sql,
- "CREATE TABLE `TBL1` (\n" +
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `TS` TIMESTAMP(3),\n" +
" `ID` VARCHAR,\n" +
" WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithWatermarkOnComputedColumn() {
- String sql = "CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" log_ts varchar,\n" +
" ts as to_timestamp(log_ts), \n" +
" WATERMARK FOR ts AS ts + interval '1' second\n" +
@@ -354,34 +363,35 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
")\n";
- check(sql,
- "CREATE TABLE `TBL1` (\n" +
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `LOG_TS` VARCHAR,\n" +
" `TS` AS `TO_TIMESTAMP`(`LOG_TS`),\n" +
" WATERMARK FOR `TS` AS (`TS` + INTERVAL '1' SECOND)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithWatermarkOnNestedField() {
- check("CREATE TABLE tbl1 (\n" +
+ final String sql = "CREATE TABLE tbl1 (\n" +
" f1 row<q1 bigint, q2 row<t1 timestamp, t2 varchar>, q3 boolean>,\n" +
" WATERMARK FOR f1.q2.t1 AS NOW()\n" +
")\n" +
" with (\n" +
" 'connector' = 'kafka', \n" +
" 'kafka.topic' = 'log.test'\n" +
- ")\n",
- "CREATE TABLE `TBL1` (\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
" `F1` ROW< `Q1` BIGINT, `Q2` ROW< `T1` TIMESTAMP, `T2` VARCHAR >, `Q3` BOOLEAN >,\n" +
" WATERMARK FOR `F1`.`Q2`.`T1` AS `NOW`()\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'kafka.topic' = 'log.test'\n" +
- ")");
+ ")";
+ sql(sql).ok(expected);
}
@Test
@@ -434,48 +444,52 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithComplexType() {
- check("CREATE TABLE tbl1 (\n" +
- " a ARRAY<bigint>, \n" +
- " b MAP<int, varchar>,\n" +
- " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
- " d MULTISET<varchar>,\n" +
- " PRIMARY KEY (a, b) \n" +
- ") with (\n" +
- " 'x' = 'y', \n" +
- " 'asd' = 'data'\n" +
- ")\n", "CREATE TABLE `TBL1` (\n" +
- " `A` ARRAY< BIGINT >,\n" +
- " `B` MAP< INTEGER, VARCHAR >,\n" +
- " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
- " `D` MULTISET< VARCHAR >,\n" +
- " PRIMARY KEY (`A`, `B`)\n" +
- ") WITH (\n" +
- " 'x' = 'y',\n" +
- " 'asd' = 'data'\n" +
- ")");
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a ARRAY<bigint>, \n" +
+ " b MAP<int, varchar>,\n" +
+ " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
+ " d MULTISET<varchar>,\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " 'x' = 'y', \n" +
+ " 'asd' = 'data'\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
+ " `A` ARRAY< BIGINT >,\n" +
+ " `B` MAP< INTEGER, VARCHAR >,\n" +
+ " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+ " `D` MULTISET< VARCHAR >,\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ") WITH (\n" +
+ " 'x' = 'y',\n" +
+ " 'asd' = 'data'\n" +
+ ")";
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithNestedComplexType() {
- check("CREATE TABLE tbl1 (\n" +
- " a ARRAY<ARRAY<bigint>>, \n" +
- " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
- " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
- " d MULTISET<ARRAY<int>>,\n" +
- " PRIMARY KEY (a, b) \n" +
- ") with (\n" +
- " 'x' = 'y', \n" +
- " 'asd' = 'data'\n" +
- ")\n", "CREATE TABLE `TBL1` (\n" +
- " `A` ARRAY< ARRAY< BIGINT > >,\n" +
- " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
- " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
- " `D` MULTISET< ARRAY< INTEGER > >,\n" +
- " PRIMARY KEY (`A`, `B`)\n" +
- ") WITH (\n" +
- " 'x' = 'y',\n" +
- " 'asd' = 'data'\n" +
- ")");
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a ARRAY<ARRAY<bigint>>, \n" +
+ " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
+ " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
+ " d MULTISET<ARRAY<int>>,\n" +
+ " PRIMARY KEY (a, b) \n" +
+ ") with (\n" +
+ " 'x' = 'y', \n" +
+ " 'asd' = 'data'\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
+ " `A` ARRAY< ARRAY< BIGINT > >,\n" +
+ " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
+ " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+ " `D` MULTISET< ARRAY< INTEGER > >,\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ") WITH (\n" +
+ " 'x' = 'y',\n" +
+ " 'asd' = 'data'\n" +
+ ")";
+ sql(sql).ok(expected);
}
@Test
@@ -494,7 +508,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'k1' = 'v1',\n" +
" 'k2' = 'v2'\n" +
")";
- check(sql, expected);
+ sql(sql).ok(expected);
}
@Test
@@ -530,7 +544,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testColumnSqlString() {
- String sql = "CREATE TABLE sls_stream (\n" +
+ final String sql = "CREATE TABLE sls_stream (\n" +
" a bigint, \n" +
" f as a + 1, \n" +
" b varchar,\n" +
@@ -542,7 +556,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'x' = 'y', \n" +
" 'asd' = 'data'\n" +
")\n";
- String expected = "`A`, (`A` + 1) AS `F`, `B`, "
+ final String expected = "`A`, (`A` + 1) AS `F`, `B`, "
+ "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, "
+ "`PROCTIME`() AS `PROC`, `C`";
sql(sql).node(new ValidationMatcher()
@@ -552,7 +566,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateInvalidPartitionedTable() {
conformance0 = FlinkSqlConformance.HIVE;
- String sql = "create table sls_stream1(\n" +
+ final String sql = "create table sls_stream1(\n" +
" a bigint,\n" +
" b VARCHAR,\n" +
" PRIMARY KEY(a, b)\n" +
@@ -567,7 +581,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testNotAllowedCreatePartition() {
conformance0 = FlinkSqlConformance.DEFAULT;
- String sql = "create table sls_stream1(\n" +
+ final String sql = "create table sls_stream1(\n" +
" a bigint,\n" +
" b VARCHAR\n" +
") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' = 'dada')";
@@ -576,7 +590,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateTableWithMinusInOptionKey() {
- String sql = "create table source_table(\n" +
+ final String sql = "create table source_table(\n" +
" a int,\n" +
" b bigint,\n" +
" c string\n" +
@@ -586,7 +600,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'a.b-c-connector.e-f.g' = 'ada',\n" +
" 'a.b-c-d.e-1231.g' = 'ada',\n" +
" 'a.b-c-d.*' = 'adad')\n";
- String expected = "CREATE TABLE `SOURCE_TABLE` (\n" +
+ final String expected = "CREATE TABLE `SOURCE_TABLE` (\n" +
" `A` INTEGER,\n" +
" `B` BIGINT,\n" +
" `C` STRING\n" +
@@ -597,12 +611,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
" 'a.b-c-d.e-1231.g' = 'ada',\n" +
" 'a.b-c-d.*' = 'adad'\n" +
")";
- check(sql, expected);
+ sql(sql).ok(expected);
}
@Test
public void testCreateTableWithOptionKeyAsIdentifier() {
- String sql = "create table source_table(\n" +
+ final String sql = "create table source_table(\n" +
" a int,\n" +
" b bigint,\n" +
" c string\n" +
@@ -614,14 +628,16 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testDropTable() {
- String sql = "DROP table catalog1.db1.tbl1";
- check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`");
+ final String sql = "DROP table catalog1.db1.tbl1";
+ final String expected = "DROP TABLE `CATALOG1`.`DB1`.`TBL1`";
+ sql(sql).ok(expected);
}
@Test
public void testDropIfExists() {
- String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
- check(sql, "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`");
+ final String sql = "DROP table IF EXISTS catalog1.db1.tbl1";
+ final String expected = "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`";
+ sql(sql).ok(expected);
}
@Test
@@ -658,7 +674,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertExtendedColumnAsStaticPartition1() {
- String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n"
+ final String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n"
+ "PARTITION (`Z` = 'ab')\n"
+ "(SELECT *\n"
+ "FROM `EMPS`)";
@@ -676,17 +692,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testInsertOverwrite() {
// non-partitioned
- check("INSERT OVERWRITE myDB.myTbl SELECT * FROM src",
- "INSERT OVERWRITE `MYDB`.`MYTBL`\n"
+ final String sql = "INSERT OVERWRITE myDB.myTbl SELECT * FROM src";
+ final String expected = "INSERT OVERWRITE `MYDB`.`MYTBL`\n"
+ "(SELECT *\n"
- + "FROM `SRC`)");
+ + "FROM `SRC`)";
+ sql(sql).ok(expected);
// partitioned
- check("INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src",
- "INSERT OVERWRITE `MYTBL`\n"
+ final String sql1 = "INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src";
+ final String expected1 = "INSERT OVERWRITE `MYTBL`\n"
+ "PARTITION (`P1` = 'v1', `P2` = 'v2')\n"
+ "(SELECT *\n"
- + "FROM `SRC`)");
+ + "FROM `SRC`)";
+ sql(sql1).ok(expected1);
}
@Test
@@ -702,7 +720,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"AS\n" +
"SELECT `COL1`\n" +
"FROM `TBL`";
- check(sql, expected);
+ sql(sql).ok(expected);
}
@Test
@@ -713,7 +731,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"AS\n" +
"SELECT `COL1`\n" +
"FROM `TBL`";
- check(sql, expected);
+ sql(sql).ok(expected);
}
@Test
@@ -723,7 +741,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
"AS\n" +
"SELECT `COL3`, `COL4`\n" +
"FROM `TBL`";
- check(sql, expected);
+ sql(sql).ok(expected);
}
@Test
@@ -731,31 +749,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
final String sql = "create view v(^*^) COMMENT 'this is a view' as select col1 from tbl";
final String expected = "(?s).*Encountered \"\\*\" at line 1, column 15.*";
- checkFails(sql, expected);
+ sql(sql).fails(expected);
}
@Test
public void testDropView() {
final String sql = "DROP VIEW IF EXISTS view_name";
- check(sql, "DROP VIEW IF EXISTS `VIEW_NAME`");
+ final String expected = "DROP VIEW IF EXISTS `VIEW_NAME`";
+ sql(sql).ok(expected);
}
// Override the test because our ROW field type default is nullable,
// which is different with Calcite.
@Override
public void testCastAsRowType() {
- checkExp("cast(a as row(f0 int, f1 varchar))",
- "CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))");
- checkExp("cast(a as row(f0 int not null, f1 varchar null))",
- "CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))");
- checkExp("cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null," +
- " f1 timestamp not null))",
- "CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR)," +
- " `F1` TIMESTAMP NOT NULL))");
- checkExp("cast(a as row(f0 bigint not null, f1 decimal null) array)",
- "CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)");
- checkExp("cast(a as row(f0 varchar not null, f1 timestamp null) multiset)",
- "CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)");
+ final String expr = "cast(a as row(f0 int, f1 varchar))";
+ final String expected = "CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))";
+ expr(expr).ok(expected);
+
+ final String expr1 = "cast(a as row(f0 int not null, f1 varchar null))";
+ final String expected1 = "CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))";
+ expr(expr1).ok(expected1);
+
+ final String expr2 = "cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null,"
+ + " f1 timestamp not null))";
+ final String expected2 = "CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR),"
+ + " `F1` TIMESTAMP NOT NULL))";
+ expr(expr2).ok(expected2);
+
+ final String expr3 = "cast(a as row(f0 bigint not null, f1 decimal null) array)";
+ final String expected3 = "CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)";
+ expr(expr3).ok(expected3);
+
+ final String expr4 = "cast(a as row(f0 varchar not null, f1 timestamp null) multiset)";
+ final String expected4 = "CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)";
+ expr(expr4).ok(expected4);
}
@Test
@@ -772,44 +800,50 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testCreateFunction() {
- check("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
- "CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
- "CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
- "CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary function db1.function1 as 'org.apache.fink.function.function1'",
- "CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary function function1 as 'org.apache.fink.function.function1'",
- "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create temporary function function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'",
- "CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
+ sql("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'")
+ .ok("CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");
- check("create temporary function function1 as 'org.apache.fink.function.function1' language java",
- "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
+ sql("create temporary function function1 as 'org.apache.fink.function.function1' language java")
+ .ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");
- check("create temporary system function function1 as 'org.apache.fink.function.function1' language scala",
- "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
+ sql("create temporary system function function1 as 'org.apache.fink.function.function1' language scala")
+ .ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
}
@Test
public void testDropTemporaryFunction() {
- check("drop temporary function catalog1.db1.function1",
- "DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
+ sql("drop temporary function catalog1.db1.function1")
+ .ok("DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
- check("drop temporary system function catalog1.db1.function1",
- "DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
+ sql("drop temporary system function catalog1.db1.function1")
+ .ok("DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");
- check("drop temporary function if exists catalog1.db1.function1",
- "DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+ sql("drop temporary function if exists catalog1.db1.function1")
+ .ok("DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
- check("drop temporary system function if exists catalog1.db1.function1",
- "DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+ sql("drop temporary system function if exists catalog1.db1.function1")
+ .ok("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
+ }
+
+ @Override
+ public void testTableHintsInInsert() {
+ // Override the superclass tests because Flink insert parse block
+ // is totally customized, and the hints are not supported yet.
}
/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. **/