You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/02 15:18:59 UTC
[flink] branch release-1.9 updated: [FLINK-13335][sql-parser] Bring
the SQL CREATE TABLE DDL closer to FLIP-37
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new e29bf2a [FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37
e29bf2a is described below
commit e29bf2ac6b4da32d4234b38f7445444a01471048
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Fri Jul 26 22:37:53 2019 +0800
[FLINK-13335][sql-parser] Bring the SQL CREATE TABLE DDL closer to FLIP-37
This brings the SQL DDL closer to FLIP-37. However, there are a couple of
known limitations.
Currently unsupported features:
- INTERVAL
- ROW with comments
- ANY
- NULL
- NOT NULL/NULL for top-level types
- ignoring collation/charset
- VARCHAR without length (=VARCHAR(1))
- TIMESTAMP WITH TIME ZONE
- user-defined types
- data types in non-DDL parts (e.g. CAST(f AS STRING))
This closes #9243.
---
docs/dev/table/sql.md | 2 +-
.../src/main/codegen/data/Parser.tdd | 27 +-
.../src/main/codegen/includes/parserImpls.ftl | 294 +++++++++++++-
.../flink/sql/parser/FlinkSqlDataTypeSpec.java | 325 +++++++++++++++
.../type/{SqlMapType.java => SqlBytesType.java} | 31 +-
.../apache/flink/sql/parser/type/SqlMapType.java | 6 +-
.../type/{SqlMapType.java => SqlMultisetType.java} | 28 +-
.../apache/flink/sql/parser/type/SqlRowType.java | 34 +-
.../type/{SqlMapType.java => SqlStringType.java} | 31 +-
.../apache/flink/sql/parser/type/SqlTimeType.java | 73 ++++
.../flink/sql/parser/type/SqlTimestampType.java | 73 ++++
.../java/org/apache/flink/sql/parser/Fixture.java | 115 ++++++
.../flink/sql/parser/FlinkDDLDataTypeTest.java | 446 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 35 +-
.../runtime/stream/sql/WindowAggregateITCase.scala | 10 +-
.../runtime/stream/TimeAttributesITCase.scala | 2 +-
16 files changed, 1394 insertions(+), 138 deletions(-)
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3155915..e607716 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1085,7 +1085,7 @@ Although not every SQL feature is implemented yet, some string combinations are
{% highlight sql %}
-A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENGTH, CHA [...]
+A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, BYTES, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICS, CHARACTERS, CHARACTER_LENG [...]
{% endhighlight %}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 77a8e58..5cefc93 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -31,10 +31,16 @@
"org.apache.flink.sql.parser.dml.RichSqlInsert",
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
"org.apache.flink.sql.parser.type.SqlArrayType",
+ "org.apache.flink.sql.parser.type.SqlBytesType",
"org.apache.flink.sql.parser.type.SqlMapType",
+ "org.apache.flink.sql.parser.type.SqlMultisetType",
"org.apache.flink.sql.parser.type.SqlRowType",
+ "org.apache.flink.sql.parser.type.SqlStringType",
+ "org.apache.flink.sql.parser.type.SqlTimeType",
+ "org.apache.flink.sql.parser.type.SqlTimestampType",
"org.apache.flink.sql.parser.utils.SqlTimeUnit",
"org.apache.flink.sql.parser.validate.FlinkSqlConformance",
+ "org.apache.flink.sql.parser.FlinkSqlDataTypeSpec",
"org.apache.flink.sql.parser.SqlProperty",
"org.apache.calcite.sql.SqlDrop",
"org.apache.calcite.sql.SqlCreate",
@@ -53,7 +59,9 @@
"FROM_SOURCE",
"BOUNDED",
"DELAY",
- "OVERWRITE"
+ "OVERWRITE",
+ "STRING",
+ "BYTES"
]
# List of keywords from "keywords" section that are not reserved.
@@ -384,13 +392,24 @@
literalParserMethods: [
]
- # List of methods for parsing custom data types.
+ # List of methods for parsing ddl supported data types.
# Return type of method implementation should be "SqlIdentifier".
# Example: SqlParseTimeStampZ().
- dataTypeParserMethods: [
+ flinkDataTypeParserMethods: [
"SqlArrayType()",
+ "SqlMultisetType()",
"SqlMapType()",
- "SqlRowType()"
+ "SqlRowType()",
+ "SqlStringType()",
+ "SqlBytesType()",
+ "SqlTimestampType()",
+ "SqlTimeType()"
+ ]
+
+ # List of methods for parsing custom data types.
+ # Return type of method implementation should be "SqlIdentifier".
+ # Example: SqlParseTimeStampZ().
+ dataTypeParserMethods: [
]
# List of methods for parsing builtin function calls.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95621fe..ae66846 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -15,6 +15,20 @@
// limitations under the License.
-->
+/**
+* Parse a nullable option, default to be nullable.
+*/
+boolean NullableOpt() :
+{
+}
+{
+ <NULL> { return true; }
+|
+ <NOT> <NULL> { return false; }
+|
+ { return true; }
+}
+
void TableColumn(TableCreationContext context) :
{
}
@@ -55,14 +69,8 @@ void TableColumn2(List<SqlNode> list) :
}
{
name = SimpleIdentifier()
- type = DataType()
- (
- <NULL> { type = type.withNullable(true); }
- |
- <NOT> <NULL> { type = type.withNullable(false); }
- |
- { type = type.withNullable(true); }
- )
+ <#-- #FlinkDataType already takes care of the nullable attribute. -->
+ type = FlinkDataType()
[ <COMMENT> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
@@ -367,53 +375,295 @@ SqlDrop SqlDropView(Span s, boolean replace) :
}
}
+SqlIdentifier FlinkCollectionsTypeName() :
+{
+}
+{
+ LOOKAHEAD(2)
+ <MULTISET> {
+ return new SqlIdentifier(SqlTypeName.MULTISET.name(), getPos());
+ }
+|
+ <ARRAY> {
+ return new SqlIdentifier(SqlTypeName.ARRAY.name(), getPos());
+ }
+}
+
+SqlIdentifier FlinkTypeName() :
+{
+ final SqlTypeName sqlTypeName;
+ final SqlIdentifier typeName;
+ final Span s = Span.of();
+}
+{
+ (
+<#-- additional types are included here -->
+<#-- make custom data types in front of Calcite core data types -->
+<#list parser.flinkDataTypeParserMethods as method>
+ <#if (method?index > 0)>
+ |
+ </#if>
+ LOOKAHEAD(2)
+ typeName = ${method}
+</#list>
+ |
+ LOOKAHEAD(2)
+ sqlTypeName = SqlTypeName(s) {
+ typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
+ }
+ |
+ LOOKAHEAD(2)
+ typeName = FlinkCollectionsTypeName()
+ |
+ typeName = CompoundIdentifier() {
+ throw new ParseException("UDT in DDL is not supported yet.");
+ }
+ )
+ {
+ return typeName;
+ }
+}
+
+/**
+* Parse a Flink data type with nullable options, NULL -> nullable, NOT NULL -> not nullable.
+* Default to be nullable.
+*/
+SqlDataTypeSpec FlinkDataType() :
+{
+ final SqlIdentifier typeName;
+ SqlIdentifier collectionTypeName = null;
+ int scale = -1;
+ int precision = -1;
+ String charSetName = null;
+ final Span s;
+ boolean nullable = true;
+ boolean elementNullable = true;
+}
+{
+ typeName = FlinkTypeName() {
+ s = span();
+ }
+ [
+ <LPAREN>
+ precision = UnsignedIntLiteral()
+ [
+ <COMMA>
+ scale = UnsignedIntLiteral()
+ ]
+ <RPAREN>
+ ]
+ elementNullable = NullableOpt()
+ [
+ collectionTypeName = FlinkCollectionsTypeName()
+ nullable = NullableOpt()
+ ]
+ {
+ if (null != collectionTypeName) {
+ return new FlinkSqlDataTypeSpec(
+ collectionTypeName,
+ typeName,
+ precision,
+ scale,
+ charSetName,
+ nullable,
+ elementNullable,
+ s.end(collectionTypeName));
+ }
+ nullable = elementNullable;
+ return new FlinkSqlDataTypeSpec(typeName,
+ precision,
+ scale,
+ charSetName,
+ null,
+ nullable,
+ elementNullable,
+ s.end(this));
+ }
+}
+
+SqlIdentifier SqlStringType() :
+{
+}
+{
+ <STRING> { return new SqlStringType(getPos()); }
+}
+
+SqlIdentifier SqlBytesType() :
+{
+}
+{
+ <BYTES> { return new SqlBytesType(getPos()); }
+}
+
+boolean WithLocalTimeZone() :
+{
+}
+{
+ <WITHOUT> <TIME> <ZONE> { return false; }
+|
+ <WITH>
+ (
+ <LOCAL> <TIME> <ZONE> { return true; }
+ |
+ <TIME> <ZONE> {
+ throw new ParseException("'WITH TIME ZONE' is not supported yet, options: " +
+ "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'.");
+ }
+ )
+|
+ { return false; }
+}
+
+SqlIdentifier SqlTimeType() :
+{
+ int precision = -1;
+ boolean withLocalTimeZone = false;
+}
+{
+ <TIME>
+ (
+ <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+ |
+ { precision = -1; }
+ )
+ withLocalTimeZone = WithLocalTimeZone()
+ { return new SqlTimeType(getPos(), precision, withLocalTimeZone); }
+}
+
+SqlIdentifier SqlTimestampType() :
+{
+ int precision = -1;
+ boolean withLocalTimeZone = false;
+}
+{
+ <TIMESTAMP>
+ (
+ <LPAREN> precision = UnsignedIntLiteral() <RPAREN>
+ |
+ { precision = -1; }
+ )
+ withLocalTimeZone = WithLocalTimeZone()
+ { return new SqlTimestampType(getPos(), precision, withLocalTimeZone); }
+}
+
SqlIdentifier SqlArrayType() :
{
SqlParserPos pos;
SqlDataTypeSpec elementType;
+ boolean nullable = true;
}
{
<ARRAY> { pos = getPos(); }
- <LT> elementType = DataType()
+ <LT>
+ elementType = FlinkDataType()
<GT>
{
return new SqlArrayType(pos, elementType);
}
}
+SqlIdentifier SqlMultisetType() :
+{
+ SqlParserPos pos;
+ SqlDataTypeSpec elementType;
+ boolean nullable = true;
+}
+{
+ <MULTISET> { pos = getPos(); }
+ <LT>
+ elementType = FlinkDataType()
+ <GT>
+ {
+ return new SqlMultisetType(pos, elementType);
+ }
+}
+
SqlIdentifier SqlMapType() :
{
SqlDataTypeSpec keyType;
SqlDataTypeSpec valType;
+ boolean nullable = true;
}
{
<MAP>
- <LT> keyType = DataType()
- <COMMA> valType = DataType()
+ <LT>
+ keyType = FlinkDataType()
+ <COMMA>
+ valType = FlinkDataType()
<GT>
{
return new SqlMapType(getPos(), keyType, valType);
}
}
+/**
+* Parse a "name1 type1 ['i'm a comment'], name2 type2 ..." list.
+*/
+void FieldNameTypeCommaList(
+ List<SqlIdentifier> fieldNames,
+ List<SqlDataTypeSpec> fieldTypes,
+ List<SqlCharStringLiteral> comments) :
+{
+ SqlIdentifier fName;
+ SqlDataTypeSpec fType;
+}
+{
+ [
+ fName = SimpleIdentifier()
+ fType = FlinkDataType()
+ {
+ fieldNames.add(fName);
+ fieldTypes.add(fType);
+ }
+ (
+ <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comments.add(SqlLiteral.createCharString(p, getPos()));
+ }
+ |
+ { comments.add(null); }
+ )
+ ]
+ (
+ <COMMA>
+ fName = SimpleIdentifier()
+ fType = FlinkDataType()
+ {
+ fieldNames.add(fName);
+ fieldTypes.add(fType);
+ }
+ (
+ <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comments.add(SqlLiteral.createCharString(p, getPos()));
+ }
+ |
+ { comments.add(null); }
+ )
+ )*
+}
+
+/**
+* Parse Row type, we support both Row(name1 type1, name2 type2) and Row<name1 type1, name2 type2>.
+* Every item type can have suffix of `NULL` or `NOT NULL` to indicate if this type is nullable.
+* i.e. Row(f0 int not null, f1 varchar null).
+*/
SqlIdentifier SqlRowType() :
{
- SqlParserPos pos;
List<SqlIdentifier> fieldNames = new ArrayList<SqlIdentifier>();
List<SqlDataTypeSpec> fieldTypes = new ArrayList<SqlDataTypeSpec>();
+ List<SqlCharStringLiteral> comments = new ArrayList<SqlCharStringLiteral>();
}
{
- <ROW> { pos = getPos(); SqlIdentifier fName; SqlDataTypeSpec fType;}
- <LT>
- fName = SimpleIdentifier() <COLON> fType = DataType()
- { fieldNames.add(fName); fieldTypes.add(fType); }
+ <ROW>
(
- <COMMA>
- fName = SimpleIdentifier() <COLON> fType = DataType()
- { fieldNames.add(fName); fieldTypes.add(fType); }
- )*
- <GT>
+ <NE>
+ |
+ <LT> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <GT>
+ |
+ <LPAREN> FieldNameTypeCommaList(fieldNames, fieldTypes, comments) <RPAREN>
+ )
{
- return new SqlRowType(pos, fieldNames, fieldTypes);
+ return new SqlRowType(getPos(), fieldNames, fieldTypes, comments);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
new file mode 100644
index 0000000..a3797d7
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlDataTypeSpec.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.type.ExtendedSqlType;
+import org.apache.flink.sql.parser.type.SqlArrayType;
+import org.apache.flink.sql.parser.type.SqlBytesType;
+import org.apache.flink.sql.parser.type.SqlMapType;
+import org.apache.flink.sql.parser.type.SqlMultisetType;
+import org.apache.flink.sql.parser.type.SqlRowType;
+import org.apache.flink.sql.parser.type.SqlStringType;
+import org.apache.flink.sql.parser.type.SqlTimeType;
+import org.apache.flink.sql.parser.type.SqlTimestampType;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCollation;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.util.Util;
+
+import java.nio.charset.Charset;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a SQL data type specification in a parse tree.
+ *
+ * <p>A <code>SqlDataTypeSpec</code> is immutable; once created, you cannot
+ * change any of the fields.</p>
+ *
+ * <p>This class is an extension to {@link SqlDataTypeSpec}, we support
+ * complex type expressions like:</p>
+ *
+ * <blockquote><code>ROW(<br>
+ * foo NUMBER(5, 2) NOT NULL,<br>
+ * rec ROW(b BOOLEAN, i MyUDT NOT NULL))</code></blockquote>
+ *
+ * <p>Until <a href="https://issues.apache.org/jira/browse/CALCITE-3213">CALCITE-3213</a>
+ * is resolved, we can remove this class.
+ */
+public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec {
+ // Flag saying if the element type is nullable if this type is a collection type.
+ // For collection type, we mean ARRAY and MULTISET type now.
+ private Boolean elementNullable;
+
+ public FlinkSqlDataTypeSpec(
+ SqlIdentifier collectionsTypeName,
+ SqlIdentifier typeName,
+ int precision,
+ int scale,
+ String charSetName,
+ Boolean nullable,
+ Boolean elementNullable,
+ SqlParserPos pos) {
+ super(collectionsTypeName, typeName, precision, scale,
+ charSetName, null, nullable, pos);
+ this.elementNullable = elementNullable;
+ }
+
+ public FlinkSqlDataTypeSpec(
+ SqlIdentifier collectionsTypeName,
+ SqlIdentifier typeName,
+ int precision,
+ int scale,
+ String charSetName,
+ TimeZone timeZone,
+ Boolean nullable,
+ Boolean elementNullable,
+ SqlParserPos pos) {
+ super(collectionsTypeName, typeName, precision, scale,
+ charSetName, timeZone, nullable, pos);
+ this.elementNullable = elementNullable;
+ }
+
+ public FlinkSqlDataTypeSpec(
+ SqlIdentifier typeName,
+ int precision,
+ int scale,
+ String charSetName,
+ TimeZone timeZone,
+ Boolean nullable,
+ Boolean elementNullable,
+ SqlParserPos pos) {
+ super(null, typeName, precision, scale,
+ charSetName, timeZone, nullable, pos);
+ this.elementNullable = elementNullable;
+ }
+
+ @Override
+ public SqlNode clone(SqlParserPos pos) {
+ return (getCollectionsTypeName() != null)
+ ? new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(), getPrecision(),
+ getScale(), getCharSetName(), getNullable(), this.elementNullable, pos)
+ : new FlinkSqlDataTypeSpec(getTypeName(), getPrecision(), getScale(),
+ getCharSetName(), getTimeZone(), getNullable(), this.elementNullable, pos);
+ }
+
+ /** Returns a copy of this data type specification with a given
+ * nullability. */
+ @Override
+ public SqlDataTypeSpec withNullable(Boolean nullable) {
+ if (Objects.equals(nullable, this.getNullable())) {
+ return this;
+ }
+ return new FlinkSqlDataTypeSpec(getCollectionsTypeName(), getTypeName(),
+ getPrecision(), getScale(), getCharSetName(), getTimeZone(), nullable,
+ this.elementNullable, getParserPosition());
+ }
+
+ @Override
+ public RelDataType deriveType(RelDataTypeFactory typeFactory) {
+ // Default to be nullable.
+ return this.deriveType(typeFactory, true);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ final SqlIdentifier typeName = getTypeName();
+ String name = typeName.getSimple();
+ if (typeName instanceof ExtendedSqlType) {
+ typeName.unparse(writer, leftPrec, rightPrec);
+ } else if (SqlTypeName.get(name) != null) {
+ SqlTypeName sqlTypeName = SqlTypeName.get(name);
+ writer.keyword(name);
+ if (sqlTypeName.allowsPrec() && this.getPrecision() >= 0) {
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+ writer.print(this.getPrecision());
+ if (sqlTypeName.allowsScale() && this.getScale() >= 0) {
+ writer.sep(",", true);
+ writer.print(this.getScale());
+ }
+
+ writer.endList(frame);
+ }
+
+ if (this.getCharSetName() != null) {
+ writer.keyword("CHARACTER SET");
+ writer.identifier(this.getCharSetName(), false);
+ }
+
+ if (this.getCollectionsTypeName() != null) {
+ // Fix up nullable attribute if this is a collection type.
+ if (elementNullable != null && !elementNullable) {
+ writer.keyword("NOT NULL");
+ }
+ writer.keyword(this.getCollectionsTypeName().getSimple());
+ }
+ } else if (name.startsWith("_")) {
+ writer.keyword(name.substring(1));
+ } else {
+ this.getTypeName().unparse(writer, leftPrec, rightPrec);
+ }
+ if (getNullable() != null && !getNullable()) {
+ writer.keyword("NOT NULL");
+ }
+ }
+
+ @Override
+ public RelDataType deriveType(RelDataTypeFactory typeFactory, boolean nullable) {
+ final SqlIdentifier typeName = getTypeName();
+ if (!typeName.isSimple()) {
+ return null;
+ }
+ final String name = typeName.getSimple();
+ final SqlTypeName sqlTypeName = SqlTypeName.get(name);
+ // Try to get Flink custom data type first.
+ RelDataType type = getExtendedType(typeFactory, typeName);
+ if (type == null) {
+ if (sqlTypeName == null) {
+ return null;
+ } else {
+ // NOTE jvs 15-Jan-2009: earlier validation is supposed to
+ // have caught these, which is why it's OK for them
+ // to be assertions rather than user-level exceptions.
+ final int precision = getPrecision();
+ final int scale = getScale();
+ if ((precision >= 0) && (scale >= 0)) {
+ assert sqlTypeName.allowsPrecScale(true, true);
+ type = typeFactory.createSqlType(sqlTypeName, precision, scale);
+ } else if (precision >= 0) {
+ assert sqlTypeName.allowsPrecNoScale();
+ type = typeFactory.createSqlType(sqlTypeName, precision);
+ } else {
+ assert sqlTypeName.allowsNoPrecNoScale();
+ type = typeFactory.createSqlType(sqlTypeName);
+ }
+ }
+ }
+
+ if (SqlTypeUtil.inCharFamily(type)) {
+ // Applying Syntax rule 10 from SQL:99 spec section 6.22 "If TD is a
+ // fixed-length, variable-length or large object character string,
+ // then the collating sequence of the result of the <cast
+ // specification> is the default collating sequence for the
+ // character repertoire of TD and the result of the <cast
+ // specification> has the Coercible coercibility characteristic."
+ SqlCollation collation = SqlCollation.COERCIBLE;
+
+ Charset charset;
+ final String charSetName = getCharSetName();
+ if (null == charSetName) {
+ charset = typeFactory.getDefaultCharset();
+ } else {
+ String javaCharSetName =
+ Objects.requireNonNull(
+ SqlUtil.translateCharacterSetName(charSetName), charSetName);
+ charset = Charset.forName(javaCharSetName);
+ }
+ type =
+ typeFactory.createTypeWithCharsetAndCollation(
+ type,
+ charset,
+ collation);
+ }
+
+ final SqlIdentifier collectionsTypeName = getCollectionsTypeName();
+ if (null != collectionsTypeName) {
+ // Fix the nullability of the element type first.
+ boolean elementNullable = true;
+ if (this.elementNullable != null) {
+ elementNullable = this.elementNullable;
+ }
+ type = typeFactory.createTypeWithNullability(type, elementNullable);
+
+ final String collectionName = collectionsTypeName.getSimple();
+ final SqlTypeName collectionsSqlTypeName =
+ Objects.requireNonNull(SqlTypeName.get(collectionName),
+ collectionName);
+
+ switch (collectionsSqlTypeName) {
+ case MULTISET:
+ type = typeFactory.createMultisetType(type, -1);
+ break;
+ case ARRAY:
+ type = typeFactory.createArrayType(type, -1);
+ break;
+ default:
+ throw Util.unexpected(collectionsSqlTypeName);
+ }
+ }
+
+ // Fix the nullability of this type.
+ if (this.getNullable() != null) {
+ nullable = this.getNullable();
+ }
+ type = typeFactory.createTypeWithNullability(type, nullable);
+
+ return type;
+ }
+
+ private RelDataType getExtendedType(RelDataTypeFactory typeFactory, SqlIdentifier typeName) {
+ // quick check.
+ if (!(typeName instanceof ExtendedSqlType)) {
+ return null;
+ }
+ if (typeName instanceof SqlBytesType) {
+ return typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE);
+ } else if (typeName instanceof SqlStringType) {
+ return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+ } else if (typeName instanceof SqlArrayType) {
+ final SqlArrayType arrayType = (SqlArrayType) typeName;
+ return typeFactory.createArrayType(arrayType.getElementType()
+ .deriveType(typeFactory), -1);
+ } else if (typeName instanceof SqlMultisetType) {
+ final SqlMultisetType multiSetType = (SqlMultisetType) typeName;
+ return typeFactory.createMultisetType(multiSetType.getElementType()
+ .deriveType(typeFactory), -1);
+ } else if (typeName instanceof SqlMapType) {
+ final SqlMapType mapType = (SqlMapType) typeName;
+ return typeFactory.createMapType(
+ mapType.getKeyType().deriveType(typeFactory),
+ mapType.getValType().deriveType(typeFactory));
+ } else if (typeName instanceof SqlRowType) {
+ final SqlRowType rowType = (SqlRowType) typeName;
+ return typeFactory.createStructType(
+ rowType.getFieldTypes().stream().map(ft -> ft.deriveType(typeFactory))
+ .collect(Collectors.toList()),
+ rowType.getFieldNames().stream().map(SqlIdentifier::getSimple)
+ .collect(Collectors.toList()));
+ } else if (typeName instanceof SqlTimeType) {
+ final SqlTimeType zonedTimeType = (SqlTimeType) typeName;
+ if (zonedTimeType.getPrecision() >= 0) {
+ return typeFactory.createSqlType(zonedTimeType.getSqlTypeName(),
+ zonedTimeType.getPrecision());
+ } else {
+ // Use default precision.
+ return typeFactory.createSqlType(zonedTimeType.getSqlTypeName());
+ }
+ } else if (typeName instanceof SqlTimestampType) {
+ final SqlTimestampType zonedTimestampType = (SqlTimestampType) typeName;
+ if (zonedTimestampType.getPrecision() >= 0) {
+ return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName(),
+ zonedTimestampType.getPrecision());
+ } else {
+ // Use default precision.
+ return typeFactory.createSqlType(zonedTimestampType.getSqlTypeName());
+ }
+ }
+ return null;
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
similarity index 55%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
index 588c993..dfc2d1f 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlBytesType.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
package org.apache.flink.sql.parser.type;
-import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
/**
- * Parse column of Map type.
+ * Parse type "BYTES" which is a synonym of VARBINARY(INT_MAX).
*/
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlBytesType extends SqlIdentifier implements ExtendedSqlType {
- private final SqlDataTypeSpec keyType;
- private final SqlDataTypeSpec valType;
-
- public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
- super(SqlTypeName.MAP.getName(), pos);
- this.keyType = keyType;
- this.valType = valType;
- }
-
- public SqlDataTypeSpec getKeyType() {
- return keyType;
- }
-
- public SqlDataTypeSpec getValType() {
- return valType;
+ public SqlBytesType(SqlParserPos pos) {
+ super("BYTES", pos);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("MAP<");
- ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
- writer.sep(",");
- ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
- writer.keyword(">");
+ writer.keyword("BYTES");
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
index 588c993..fd071c0 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
@@ -48,10 +48,12 @@ public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("MAP<");
+ writer.keyword("MAP");
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+ writer.sep(",");
ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
writer.sep(",");
ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
- writer.keyword(">");
+ writer.endList(frame);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
similarity index 64%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
index 588c993..3b6f19c 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMultisetType.java
@@ -25,33 +25,25 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
/**
- * Parse column of Map type.
+ * Parse column of MULTISET type.
*/
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlMultisetType extends SqlIdentifier implements ExtendedSqlType {
- private final SqlDataTypeSpec keyType;
- private final SqlDataTypeSpec valType;
+ private final SqlDataTypeSpec elementType;
- public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
- super(SqlTypeName.MAP.getName(), pos);
- this.keyType = keyType;
- this.valType = valType;
+ public SqlMultisetType(SqlParserPos pos, SqlDataTypeSpec elementType) {
+ super(SqlTypeName.MULTISET.getName(), pos);
+ this.elementType = elementType;
}
- public SqlDataTypeSpec getKeyType() {
- return keyType;
- }
-
- public SqlDataTypeSpec getValType() {
- return valType;
+ public SqlDataTypeSpec getElementType() {
+ return elementType;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("MAP<");
- ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
- writer.sep(",");
- ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
+ writer.keyword("MULTISET<");
+ ExtendedSqlType.unparseType(this.elementType, writer, leftPrec, rightPrec);
writer.keyword(">");
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
index e77529e..886125c 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlRowType.java
@@ -18,6 +18,7 @@
package org.apache.flink.sql.parser.type;
+import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlWriter;
@@ -34,13 +35,16 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
private final List<SqlIdentifier> fieldNames;
private final List<SqlDataTypeSpec> fieldTypes;
+ private final List<SqlCharStringLiteral> comments;
public SqlRowType(SqlParserPos pos,
List<SqlIdentifier> fieldNames,
- List<SqlDataTypeSpec> fieldTypes) {
+ List<SqlDataTypeSpec> fieldTypes,
+ List<SqlCharStringLiteral> comments) {
super(SqlTypeName.ROW.getName(), pos);
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
+ this.comments = comments;
}
public List<SqlIdentifier> getFieldNames() {
@@ -51,6 +55,10 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
return fieldTypes;
}
+ public List<SqlCharStringLiteral> getComments() {
+ return comments;
+ }
+
public int getArity() {
return fieldNames.size();
}
@@ -65,14 +73,22 @@ public class SqlRowType extends SqlIdentifier implements ExtendedSqlType {
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("ROW");
- SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
- for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
- writer.sep(",", false);
- p.left.unparse(writer, 0, 0);
- writer.sep(":");
- ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+ writer.print("ROW");
+ if (getFieldNames().size() == 0) {
+ writer.print("<>");
+ } else {
+ SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "<", ">");
+ int i = 0;
+ for (Pair<SqlIdentifier, SqlDataTypeSpec> p : Pair.zip(this.fieldNames, this.fieldTypes)) {
+ writer.sep(",", false);
+ p.left.unparse(writer, 0, 0);
+ ExtendedSqlType.unparseType(p.right, writer, leftPrec, rightPrec);
+ if (comments.get(i) != null) {
+ comments.get(i).unparse(writer, leftPrec, rightPrec);
+ }
+ i += 1;
+ }
+ writer.endList(frame);
}
- writer.endList(frame);
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
similarity index 55%
copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
index 588c993..a134b13 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapType.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlStringType.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,40 +18,21 @@
package org.apache.flink.sql.parser.type;
-import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
/**
- * Parse column of Map type.
+ * Parse type "STRING" which is a synonym of VARCHAR(INT_MAX).
*/
-public class SqlMapType extends SqlIdentifier implements ExtendedSqlType {
+public class SqlStringType extends SqlIdentifier implements ExtendedSqlType {
- private final SqlDataTypeSpec keyType;
- private final SqlDataTypeSpec valType;
-
- public SqlMapType(SqlParserPos pos, SqlDataTypeSpec keyType, SqlDataTypeSpec valType) {
- super(SqlTypeName.MAP.getName(), pos);
- this.keyType = keyType;
- this.valType = valType;
- }
-
- public SqlDataTypeSpec getKeyType() {
- return keyType;
- }
-
- public SqlDataTypeSpec getValType() {
- return valType;
+ public SqlStringType(SqlParserPos pos) {
+ super("STRING", pos);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("MAP<");
- ExtendedSqlType.unparseType(keyType, writer, leftPrec, rightPrec);
- writer.sep(",");
- ExtendedSqlType.unparseType(valType, writer, leftPrec, rightPrec);
- writer.keyword(">");
+ writer.keyword("STRING");
}
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
new file mode 100644
index 0000000..23855ce
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimeType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIME WITHOUT TIME ZONE", "TIME(3) WITHOUT TIME ZONE", "TIME WITH LOCAL TIME ZONE",
+ * or "TIME(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimeType extends SqlIdentifier implements ExtendedSqlType {
+ private final int precision;
+ private final boolean withLocalTimeZone;
+
+ public SqlTimeType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
+ super(getTypeName(withLocalTimeZone), pos);
+ this.precision = precision;
+ this.withLocalTimeZone = withLocalTimeZone;
+ }
+
+ private static String getTypeName(boolean withLocalTimeZone) {
+ if (withLocalTimeZone) {
+ return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE.name();
+ } else {
+ return SqlTypeName.TIME.name();
+ }
+ }
+
+ public SqlTypeName getSqlTypeName() {
+ if (withLocalTimeZone) {
+ return SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE;
+ } else {
+ return SqlTypeName.TIME;
+ }
+ }
+
+ public int getPrecision() {
+ return this.precision;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword(SqlTypeName.TIME.name());
+ if (this.precision >= 0) {
+ final SqlWriter.Frame frame =
+ writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+ writer.print(precision);
+ writer.endList(frame);
+ }
+ if (this.withLocalTimeZone) {
+ writer.keyword("WITH LOCAL TIME ZONE");
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
new file mode 100644
index 0000000..09e2d08
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlTimestampType.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.type;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Parse type "TIMESTAMP WITHOUT TIME ZONE", "TIMESTAMP(3) WITHOUT TIME ZONE",
+ * "TIMESTAMP WITH LOCAL TIME ZONE", or "TIMESTAMP(3) WITH LOCAL TIME ZONE".
+ */
+public class SqlTimestampType extends SqlIdentifier implements ExtendedSqlType {
+ private final int precision;
+ private final boolean withLocalTimeZone;
+
+ public SqlTimestampType(SqlParserPos pos, int precision, boolean withLocalTimeZone) {
+ super(getTypeName(withLocalTimeZone), pos);
+ this.precision = precision;
+ this.withLocalTimeZone = withLocalTimeZone;
+ }
+
+ private static String getTypeName(boolean withLocalTimeZone) {
+ if (withLocalTimeZone) {
+ return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name();
+ } else {
+ return SqlTypeName.TIMESTAMP.name();
+ }
+ }
+
+ public SqlTypeName getSqlTypeName() {
+ if (withLocalTimeZone) {
+ return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+ } else {
+ return SqlTypeName.TIMESTAMP;
+ }
+ }
+
+ public int getPrecision() {
+ return this.precision;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword(SqlTypeName.TIMESTAMP.name());
+ if (this.precision >= 0) {
+ final SqlWriter.Frame frame =
+ writer.startList(SqlWriter.FrameTypeEnum.FUN_CALL, "(", ")");
+ writer.print(precision);
+ writer.endList(frame);
+ }
+ if (this.withLocalTimeZone) {
+ writer.keyword("WITH LOCAL TIME ZONE");
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
new file mode 100644
index 0000000..671958e
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.List;
+
+/**
+ * Type used during tests.
+ */
+public class Fixture {
+ private final RelDataTypeFactory typeFactory;
+
+ final RelDataType char1Type;
+ final RelDataType char33Type;
+ final RelDataType varcharType;
+ final RelDataType varchar33Type;
+ final RelDataType booleanType;
+ final RelDataType binaryType;
+ final RelDataType binary33Type;
+ final RelDataType varbinaryType;
+ final RelDataType varbinary33Type;
+ final RelDataType decimalType;
+ final RelDataType decimalP10S0Type;
+ final RelDataType decimalP10S3Type;
+ final RelDataType tinyintType;
+ final RelDataType smallintType;
+ final RelDataType intType;
+ final RelDataType bigintType;
+ final RelDataType floatType;
+ final RelDataType doubleType;
+ final RelDataType dateType;
+ final RelDataType timeType;
+ final RelDataType time3Type;
+ final RelDataType timestampType;
+ final RelDataType timestamp3Type;
+ final RelDataType timestampWithLocalTimeZoneType;
+ final RelDataType timestamp3WithLocalTimeZoneType;
+ final RelDataType nullType;
+
+ Fixture(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ this.char1Type = typeFactory.createSqlType(SqlTypeName.CHAR);
+ this.char33Type = typeFactory.createSqlType(SqlTypeName.CHAR, 33);
+ this.varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ this.varchar33Type = typeFactory.createSqlType(SqlTypeName.VARCHAR, 33);
+ this.booleanType = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+ this.binaryType = typeFactory.createSqlType(SqlTypeName.BINARY);
+ this.binary33Type = typeFactory.createSqlType(SqlTypeName.BINARY, 33);
+ this.varbinaryType = typeFactory.createSqlType(SqlTypeName.VARBINARY);
+ this.varbinary33Type = typeFactory.createSqlType(SqlTypeName.VARBINARY, 33);
+ this.decimalType = typeFactory.createSqlType(SqlTypeName.DECIMAL);
+ this.decimalP10S0Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10);
+ this.decimalP10S3Type = typeFactory.createSqlType(SqlTypeName.DECIMAL, 10, 3);
+ this.tinyintType = typeFactory.createSqlType(SqlTypeName.TINYINT);
+ this.smallintType = typeFactory.createSqlType(SqlTypeName.SMALLINT);
+ this.intType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+ this.bigintType = typeFactory.createSqlType(SqlTypeName.BIGINT);
+ this.floatType = typeFactory.createSqlType(SqlTypeName.FLOAT);
+ this.doubleType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+ this.dateType = typeFactory.createSqlType(SqlTypeName.DATE);
+ this.timeType = typeFactory.createSqlType(SqlTypeName.TIME);
+ this.time3Type = typeFactory.createSqlType(SqlTypeName.TIME, 3);
+ this.timestampType = typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+ this.timestamp3Type = typeFactory.createSqlType(SqlTypeName.TIMESTAMP, 3);
+ this.timestampWithLocalTimeZoneType =
+ typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ this.timestamp3WithLocalTimeZoneType =
+ typeFactory.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
+ this.nullType = typeFactory.createSqlType(SqlTypeName.NULL);
+ }
+
+ public RelDataType createSqlType(SqlTypeName sqlTypeName, int precision) {
+ return typeFactory.createSqlType(sqlTypeName, precision);
+ }
+
+ public RelDataType createArrayType(RelDataType elementType) {
+ return typeFactory.createArrayType(elementType, -1);
+ }
+
+ public RelDataType createMultisetType(RelDataType elementType) {
+ return typeFactory.createMultisetType(elementType, -1);
+ }
+
+ public RelDataType createMapType(RelDataType keyType, RelDataType valType) {
+ return typeFactory.createMapType(keyType, valType);
+ }
+
+ public RelDataType createStructType(List<RelDataType> keyTypes, List<String> names) {
+ return typeFactory.createStructType(keyTypes, names);
+ }
+
+ public RelDataType nullable(RelDataType type) {
+ return typeFactory.createTypeWithNullability(type, true);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
new file mode 100644
index 0000000..4b4499f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import org.apache.calcite.avatica.util.Casing;
+import org.apache.calcite.avatica.util.Quoting;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.dialect.CalciteSqlDialect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.apache.calcite.sql.parser.SqlParserUtil;
+import org.apache.calcite.sql.pretty.SqlPrettyWriter;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.test.SqlValidatorTestCase;
+import org.apache.calcite.util.Util;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for all the sup[ported flink DDL data types.
+ */
+@RunWith(Parameterized.class)
+public class FlinkDDLDataTypeTest {
+ private FlinkSqlConformance conformance = FlinkSqlConformance.DEFAULT;
+ private static final RelDataTypeFactory TYPE_FACTORY =
+ new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ private static final Fixture FIXTURE = new Fixture(TYPE_FACTORY);
+ private static final String DDL_FORMAT = "create table t1 (\n" +
+ " f0 %s\n" +
+ ") with (\n" +
+ " k1 = 'v1'\n" +
+ ")";
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static List<TestItem> testData() {
+ return Arrays.asList(
+ createTestItem("CHAR", nullable(FIXTURE.char1Type), "CHAR"),
+ createTestItem("CHAR NOT NULL", FIXTURE.char1Type, "CHAR NOT NULL"),
+ createTestItem("CHAR NOT \t\nNULL", FIXTURE.char1Type, "CHAR NOT NULL"),
+ createTestItem("char not null", FIXTURE.char1Type, "CHAR NOT NULL"),
+ createTestItem("CHAR NULL", nullable(FIXTURE.char1Type), "CHAR"),
+ createTestItem("CHAR(33)", nullable(FIXTURE.char33Type), "CHAR(33)"),
+ createTestItem("VARCHAR", nullable(FIXTURE.varcharType), "VARCHAR"),
+ createTestItem("VARCHAR(33)", nullable(FIXTURE.varchar33Type), "VARCHAR(33)"),
+ createTestItem("STRING",
+ nullable(FIXTURE.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)), "STRING"),
+ createTestItem("BOOLEAN", nullable(FIXTURE.booleanType), "BOOLEAN"),
+ createTestItem("BINARY", nullable(FIXTURE.binaryType), "BINARY"),
+ createTestItem("BINARY(33)", nullable(FIXTURE.binary33Type), "BINARY(33)"),
+ createTestItem("VARBINARY", nullable(FIXTURE.varbinaryType), "VARBINARY"),
+ createTestItem("VARBINARY(33)", nullable(FIXTURE.varbinary33Type),
+ "VARBINARY(33)"),
+ createTestItem("BYTES",
+ nullable(FIXTURE.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE)),
+ "BYTES"),
+ createTestItem("DECIMAL", nullable(FIXTURE.decimalType), "DECIMAL"),
+ createTestItem("DEC", nullable(FIXTURE.decimalType), "DECIMAL"),
+ createTestItem("NUMERIC", nullable(FIXTURE.decimalType), "DECIMAL"),
+ createTestItem("DECIMAL(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+ createTestItem("DEC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+ createTestItem("NUMERIC(10)", nullable(FIXTURE.decimalP10S0Type), "DECIMAL(10)"),
+ createTestItem("DECIMAL(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+ "DECIMAL(10, 3)"),
+ createTestItem("DEC(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+ "DECIMAL(10, 3)"),
+ createTestItem("NUMERIC(10, 3)", nullable(FIXTURE.decimalP10S3Type),
+ "DECIMAL(10, 3)"),
+ createTestItem("TINYINT", nullable(FIXTURE.tinyintType), "TINYINT"),
+ createTestItem("SMALLINT", nullable(FIXTURE.smallintType), "SMALLINT"),
+ createTestItem("INTEGER", nullable(FIXTURE.intType), "INTEGER"),
+ createTestItem("INT", nullable(FIXTURE.intType), "INTEGER"),
+ createTestItem("BIGINT", nullable(FIXTURE.bigintType), "BIGINT"),
+ createTestItem("FLOAT", nullable(FIXTURE.floatType), "FLOAT"),
+ createTestItem("DOUBLE", nullable(FIXTURE.doubleType), "DOUBLE"),
+ createTestItem("DOUBLE PRECISION", nullable(FIXTURE.doubleType), "DOUBLE"),
+ createTestItem("DATE", nullable(FIXTURE.dateType), "DATE"),
+ createTestItem("TIME", nullable(FIXTURE.timeType), "TIME"),
+ createTestItem("TIME WITHOUT TIME ZONE", nullable(FIXTURE.timeType), "TIME"),
+ createTestItem("TIME(3)", nullable(FIXTURE.time3Type), "TIME(3)"),
+ createTestItem("TIME(3) WITHOUT TIME ZONE", nullable(FIXTURE.time3Type),
+ "TIME(3)"),
+ createTestItem("TIMESTAMP", nullable(FIXTURE.timestampType), "TIMESTAMP"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE", nullable(FIXTURE.timestampType),
+ "TIMESTAMP"),
+ createTestItem("TIMESTAMP(3)", nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE",
+ nullable(FIXTURE.timestamp3Type), "TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITH LOCAL TIME ZONE",
+ nullable(FIXTURE.timestampWithLocalTimeZoneType),
+ "TIMESTAMP WITH LOCAL TIME ZONE"),
+ createTestItem("TIMESTAMP(3) WITH LOCAL TIME ZONE",
+ nullable(FIXTURE.timestamp3WithLocalTimeZoneType),
+ "TIMESTAMP(3) WITH LOCAL TIME ZONE"),
+ createTestItem("ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
+ nullable(FIXTURE.createArrayType(nullable(FIXTURE.timestamp3WithLocalTimeZoneType))),
+ "ARRAY< TIMESTAMP(3) WITH LOCAL TIME ZONE >"),
+ createTestItem("ARRAY<INT NOT NULL>",
+ nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+ "ARRAY< INTEGER NOT NULL >"),
+ createTestItem("INT ARRAY",
+ nullable(FIXTURE.createArrayType(nullable(FIXTURE.intType))),
+ "INTEGER ARRAY"),
+ createTestItem("INT NOT NULL ARRAY",
+ nullable(FIXTURE.createArrayType(FIXTURE.intType)),
+ "INTEGER NOT NULL ARRAY"),
+ createTestItem("INT ARRAY NOT NULL",
+ FIXTURE.createArrayType(nullable(FIXTURE.intType)),
+ "INTEGER ARRAY NOT NULL"),
+ createTestItem("MULTISET<INT NOT NULL>",
+ nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+ "MULTISET< INTEGER NOT NULL >"),
+ createTestItem("INT MULTISET",
+ nullable(FIXTURE.createMultisetType(nullable(FIXTURE.intType))),
+ "INTEGER MULTISET"),
+ createTestItem("INT NOT NULL MULTISET",
+ nullable(FIXTURE.createMultisetType(FIXTURE.intType)),
+ "INTEGER NOT NULL MULTISET"),
+ createTestItem("INT MULTISET NOT NULL",
+ FIXTURE.createMultisetType(nullable(FIXTURE.intType)),
+ "INTEGER MULTISET NOT NULL"),
+ createTestItem("MAP<BIGINT, BOOLEAN>",
+ nullable(FIXTURE.createMapType(
+ nullable(FIXTURE.bigintType),
+ nullable(FIXTURE.booleanType))),
+ "MAP< BIGINT, BOOLEAN >"),
+ createTestItem("ROW<f0 INT NOT NULL, f1 BOOLEAN>",
+ nullable(FIXTURE.createStructType(
+ Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+ Arrays.asList("f0", "f1"))),
+ "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+ createTestItem("ROW(f0 INT NOT NULL, f1 BOOLEAN)",
+ nullable(FIXTURE.createStructType(
+ Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+ Arrays.asList("f0", "f1"))),
+ "ROW< `f0` INTEGER NOT NULL, `f1` BOOLEAN >"),
+ createTestItem("ROW<`f0` INT>",
+ nullable(FIXTURE.createStructType(
+ Collections.singletonList(nullable(FIXTURE.intType)),
+ Collections.singletonList("f0"))),
+ "ROW< `f0` INTEGER >"),
+ createTestItem("ROW(`f0` INT)",
+ nullable(FIXTURE.createStructType(
+ Collections.singletonList(nullable(FIXTURE.intType)),
+ Collections.singletonList("f0"))),
+ "ROW< `f0` INTEGER >"),
+ createTestItem("ROW<>",
+ nullable(FIXTURE.createStructType(
+ Collections.emptyList(),
+ Collections.emptyList())),
+ "ROW<>"),
+ createTestItem("ROW()",
+ nullable(FIXTURE.createStructType(
+ Collections.emptyList(),
+ Collections.emptyList())),
+ "ROW<>"),
+ createTestItem("ROW<f0 INT NOT NULL 'This is a comment.', "
+ + "f1 BOOLEAN 'This as well.'>",
+ nullable(FIXTURE.createStructType(
+ Arrays.asList(FIXTURE.intType, nullable(FIXTURE.booleanType)),
+ Arrays.asList("f0", "f1"))),
+ "ROW< `f0` INTEGER NOT NULL 'This is a comment.', "
+ + "`f1` BOOLEAN 'This as well.' >"),
+
+ // test parse throws error.
+ createTestItem("TIMESTAMP WITH TIME ZONE",
+ "'WITH TIME ZONE' is not supported yet, options: "
+ + "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."),
+ createTestItem("TIMESTAMP(3) WITH TIME ZONE",
+ "'WITH TIME ZONE' is not supported yet, options: "
+ + "'WITHOUT TIME ZONE', 'WITH LOCAL TIME ZONE'."),
+ createTestItem("^NULL^",
+ "(?s).*Encountered \"NULL\" at line 2, column 6..*"),
+ createTestItem("cat.db.MyType",
+ "(?s).*UDT in DDL is not supported yet..*"),
+ createTestItem("`db`.`MyType`",
+ "(?s).*UDT in DDL is not supported yet..*"),
+ createTestItem("MyType",
+ "(?s).*UDT in DDL is not supported yet..*"),
+ createTestItem("ARRAY<MyType>",
+ "(?s).*UDT in DDL is not supported yet..*"),
+ createTestItem("ROW<f0 MyType, f1 `c`.`d`.`t`>",
+ "(?s).*UDT in DDL is not supported yet..*"),
+ createTestItem("^INTERVAL^ YEAR",
+ "(?s).*Encountered \"INTERVAL\" at line 2, column 6..*"),
+ createTestItem("ANY(^'unknown.class'^, '')",
+ "(?s).*Encountered \"\\\\'unknown.class\\\\'\" at line 2, column 10.\n.*"
+ + "Was expecting:\n"
+ + " <UNSIGNED_INTEGER_LITERAL> ...\n"
+ + ".*"));
+ }
+
+ private static TestItem createTestItem(Object... args) {
+ assert args.length >= 2;
+ final String testExpr = (String) args[0];
+ TestItem testItem = TestItem.fromTestExpr(testExpr);
+ if (args[1] instanceof String) {
+ testItem.withExpectedError((String) args[1]);
+ } else if (args[1] instanceof RelDataType) {
+ testItem.withExpectedType((RelDataType) args[1]);
+ }
+ if (args.length == 3) {
+ testItem.withExpectedUnparsed((String) args[2]);
+ }
+ return testItem;
+ }
+
+ @Parameterized.Parameter
+ public TestItem testItem;
+
+ @Test
+ public void testDataTypeParsing() {
+ if (testItem.expectedType != null) {
+ checkType(testItem.testExpr, testItem.expectedType);
+ }
+ }
+
+ @Test
+ public void testThrowsError() {
+ if (testItem.expectedError != null) {
+ checkFails(testItem.testExpr, testItem.expectedError);
+ }
+ }
+
+ @Test
+ public void testDataTypeUnparsing() {
+ if (testItem.expectedUnparsed != null) {
+ checkUnparseTo(testItem.testExpr, testItem.expectedUnparsed);
+ }
+ }
+
+ private static RelDataType nullable(RelDataType type) {
+ return FIXTURE.nullable(type);
+ }
+
+ private void checkType(String typeExpr, RelDataType expectedType) {
+ this.sql(String.format(DDL_FORMAT, typeExpr)).checkType(expectedType);
+ }
+
+ private void checkFails(String typeExpr, String expectedMsgPattern) {
+ sql(String.format(DDL_FORMAT, typeExpr)).fails(expectedMsgPattern);
+ }
+
+ private void checkUnparseTo(String typeExpr, String expectedUnparsed) {
+ sql(String.format(DDL_FORMAT, typeExpr)).unparsedTo(expectedUnparsed);
+ }
+
+ private Tester getTester() {
+ return new TesterImpl();
+ }
+
+ private Sql sql(String sql) {
+ return new Sql(sql);
+ }
+
+ //~ Inner Classes ----------------------------------------------------------
+
+ private static class TestItem {
+ private final String testExpr;
+ @Nullable
+ private RelDataType expectedType;
+ @Nullable
+ private String expectedError;
+ @Nullable
+ private String expectedUnparsed;
+
+ private TestItem(String testExpr) {
+ this.testExpr = testExpr;
+ }
+
+ static TestItem fromTestExpr(String testExpr) {
+ return new TestItem(testExpr);
+ }
+
+ TestItem withExpectedType(RelDataType expectedType) {
+ this.expectedType = expectedType;
+ return this;
+ }
+
+ TestItem withExpectedError(String expectedError) {
+ this.expectedError = expectedError;
+ return this;
+ }
+
+ TestItem withExpectedUnparsed(String expectedUnparsed) {
+ this.expectedUnparsed = expectedUnparsed;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return this.testExpr;
+ }
+ }
+
+ private class Sql {
+ private final String sql;
+
+ Sql(String sql) {
+ this.sql = sql;
+ }
+
+ public Sql checkType(RelDataType type) {
+ getTester().checkType(this.sql, type);
+ return this;
+ }
+
+ public Sql fails(String expectedMsgPattern) {
+ getTester().checkFails(this.sql, expectedMsgPattern);
+ return this;
+ }
+
+ public Sql unparsedTo(String expectedUnparsed) {
+ getTester().checkUnparsed(this.sql, expectedUnparsed);
+ return this;
+ }
+ }
+
+ /**
+ * Callback to control how test actions are performed.
+ */
+ protected interface Tester {
+ void checkType(String sql, RelDataType type);
+
+ void checkFails(String sql, String expectedMsgPattern);
+
+ void checkUnparsed(String sql, String expectedUnparsed);
+ }
+
+ /**
+ * Default implementation of {@link SqlParserTest.Tester}.
+ */
+ protected class TesterImpl implements Tester {
+ private SqlParser getSqlParser(String sql) {
+ return SqlParser.create(sql,
+ SqlParser.configBuilder()
+ .setParserFactory(FlinkSqlParserImpl.FACTORY)
+ .setQuoting(Quoting.BACK_TICK)
+ .setUnquotedCasing(Casing.UNCHANGED)
+ .setQuotedCasing(Casing.UNCHANGED)
+ .setConformance(conformance)
+ .build());
+ }
+
+ private SqlDialect getSqlDialect() {
+ return new CalciteSqlDialect(SqlDialect.EMPTY_CONTEXT
+ .withQuotedCasing(Casing.UNCHANGED)
+ .withConformance(conformance)
+ .withUnquotedCasing(Casing.UNCHANGED)
+ .withIdentifierQuoteString("`"));
+ }
+
+ public void checkType(String sql, RelDataType type) {
+ final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+ assert sqlNode instanceof SqlCreateTable;
+ final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
+ SqlNodeList columns = sqlCreateTable.getColumnList();
+ assert columns.size() == 1;
+ RelDataType columnType = ((SqlTableColumn) columns.get(0)).getType()
+ .deriveType(TYPE_FACTORY);
+ assertEquals(type, columnType);
+ }
+
+ private SqlNode parseStmtAndHandleEx(String sql) {
+ final SqlNode sqlNode;
+ try {
+ sqlNode = getSqlParser(sql).parseStmt();
+ } catch (SqlParseException e) {
+ throw new RuntimeException("Error while parsing SQL: " + sql, e);
+ }
+ return sqlNode;
+ }
+
+ public void checkFails(
+ String sql,
+ String expectedMsgPattern) {
+ SqlParserUtil.StringAndPos sap = SqlParserUtil.findPos(sql);
+ Throwable thrown = null;
+ try {
+ final SqlNode sqlNode;
+ sqlNode = getSqlParser(sap.sql).parseStmt();
+ Util.discard(sqlNode);
+ } catch (Throwable ex) {
+ thrown = ex;
+ }
+
+ checkEx(expectedMsgPattern, sap, thrown);
+ }
+
+ public void checkUnparsed(String sql, String expectedUnparsed) {
+ final SqlNode sqlNode = parseStmtAndHandleEx(sql);
+ assert sqlNode instanceof SqlCreateTable;
+ final SqlCreateTable sqlCreateTable = (SqlCreateTable) sqlNode;
+ SqlNodeList columns = sqlCreateTable.getColumnList();
+ assert columns.size() == 1;
+ SqlDataTypeSpec dataTypeSpec = ((SqlTableColumn) columns.get(0)).getType();
+ SqlWriter sqlWriter = new SqlPrettyWriter(getSqlDialect(), false);
+ dataTypeSpec.unparse(sqlWriter, 0, 0);
+ assertEquals(expectedUnparsed, sqlWriter.toSqlString().getSql());
+ }
+
+ private void checkEx(String expectedMsgPattern,
+ SqlParserUtil.StringAndPos sap,
+ Throwable thrown) {
+ SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap);
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index f045817..cc892ab 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -366,7 +366,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
check("CREATE TABLE tbl1 (\n" +
" a ARRAY<bigint>, \n" +
" b MAP<int, varchar>,\n" +
- " c ROW<cc0:int, cc1: float, cc2: varchar>,\n" +
+ " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" +
+ " d MULTISET<varchar>,\n" +
" PRIMARY KEY (a, b) \n" +
") with (\n" +
" x = 'y', \n" +
@@ -374,28 +375,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
")\n", "CREATE TABLE `TBL1` (\n" +
" `A` ARRAY< BIGINT >,\n" +
" `B` MAP< INTEGER, VARCHAR >,\n" +
- " `C` ROW< `CC0` : INTEGER, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
- " PRIMARY KEY (`A`, `B`)\n" +
- ") WITH (\n" +
- " `X` = 'y',\n" +
- " `ASD` = 'data'\n" +
- ")");
- }
-
- @Test
- public void testCreateTableWithDecimalType() {
- check("CREATE TABLE tbl1 (\n" +
- " a decimal, \n" +
- " b decimal(10, 0),\n" +
- " c decimal(38, 38),\n" +
- " PRIMARY KEY (a, b) \n" +
- ") with (\n" +
- " x = 'y', \n" +
- " asd = 'data'\n" +
- ")\n", "CREATE TABLE `TBL1` (\n" +
- " `A` DECIMAL,\n" +
- " `B` DECIMAL(10, 0),\n" +
- " `C` DECIMAL(38, 38),\n" +
+ " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+ " `D` MULTISET< VARCHAR >,\n" +
" PRIMARY KEY (`A`, `B`)\n" +
") WITH (\n" +
" `X` = 'y',\n" +
@@ -408,7 +389,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
check("CREATE TABLE tbl1 (\n" +
" a ARRAY<ARRAY<bigint>>, \n" +
" b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" +
- " c ROW<cc0:ARRAY<int>, cc1: float, cc2: varchar>,\n" +
+ " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" +
+ " d MULTISET<ARRAY<int>>,\n" +
" PRIMARY KEY (a, b) \n" +
") with (\n" +
" x = 'y', \n" +
@@ -416,7 +398,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
")\n", "CREATE TABLE `TBL1` (\n" +
" `A` ARRAY< ARRAY< BIGINT > >,\n" +
" `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" +
- " `C` ROW< `CC0` : ARRAY< INTEGER >, `CC1` : FLOAT, `CC2` : VARCHAR >,\n" +
+ " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" +
+ " `D` MULTISET< ARRAY< INTEGER > >,\n" +
" PRIMARY KEY (`A`, `B`)\n" +
") WITH (\n" +
" `X` = 'y',\n" +
@@ -437,7 +420,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
")\n", "(?s).*Encountered \"\\(\" at line 4, column 14.\n" +
"Was expecting one of:\n" +
" \"AS\" ...\n" +
- " \"CHARACTER\" ...\n" +
+ " \"ARRAY\" ...\n" +
".*");
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index 132755f..077fdfd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -70,7 +70,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
val sql =
"""
|SELECT
- | string,
+ | `string`,
| HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
| HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
| COUNT(1),
@@ -79,7 +79,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
| COUNT(DISTINCT `float`),
| concat_distinct_agg(name)
|FROM T1
- |GROUP BY string, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
+ |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
""".stripMargin
val sink = new TestingAppendSink
@@ -122,7 +122,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
val sql =
"""
|SELECT
- | string,
+ | `string`,
| SESSION_START(rowtime, INTERVAL '0.005' SECOND),
| SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),
| COUNT(1),
@@ -131,7 +131,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
| SUM(`int`),
| COUNT(DISTINCT name)
|FROM T1
- |GROUP BY string, SESSION(rowtime, INTERVAL '0.005' SECOND)
+ |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)
""".stripMargin
val sink = new TestingAppendSink
@@ -171,7 +171,7 @@ class WindowAggregateITCase(mode: StateBackendMode)
val sql =
"""
|SELECT
- | string,
+ | `string`,
| TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,
| TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),
| COUNT(DISTINCT `long`),
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 01a55f1..b332818 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -512,7 +512,7 @@ class TimeAttributesITCase extends AbstractTestBase {
.assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
tEnv.registerTable("T1", table)
- val querySql = "select rowtime as ts, string as msg from T1"
+ val querySql = "select rowtime as ts, `string` as msg from T1"
val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
results.addSink(new StreamITCase.StringSink[Pojo1])