You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/15 03:25:21 UTC
[flink] branch master updated: [FLINK-19092][sql-parser] Support to
parse comment on computed column
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b873564 [FLINK-19092][sql-parser] Support to parse comment on computed column
b873564 is described below
commit b873564357612bbea788332632955fc256105d35
Author: SteNicholas <pr...@163.com>
AuthorDate: Tue Sep 15 11:23:22 2020 +0800
[FLINK-19092][sql-parser] Support to parse comment on computed column
This closes #13352
---
.../src/main/codegen/includes/parserImpls.ftl | 17 ++++---
.../flink/sql/parser/ddl/SqlCreateTable.java | 41 ++++++-----------
.../flink/sql/parser/ddl/SqlTableColumn.java | 52 +++++++++++++++++-----
.../flink/sql/parser/FlinkSqlParserImplTest.java | 35 +++++++++++++++
.../planner/operations/MergeTableLikeUtil.java | 17 +++----
.../planner/operations/MergeTableLikeUtilTest.java | 8 ++--
.../table/sqlexec/SqlToOperationConverter.java | 2 +-
7 files changed, 115 insertions(+), 57 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ae73717..0432b56 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -515,17 +515,24 @@ void Watermark(TableCreationContext context) :
void ComputedColumn(TableCreationContext context) :
{
- SqlNode identifier;
+ SqlIdentifier identifier;
SqlNode expr;
SqlParserPos pos;
+ SqlCharStringLiteral comment = null;
}
{
identifier = SimpleIdentifier() {pos = getPos();}
<AS>
- expr = Expression(ExprContext.ACCEPT_NON_QUERY) {
- expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
- context.columnList.add(expr);
- }
+ expr = Expression(ExprContext.ACCEPT_NON_QUERY)
+ [ <COMMENT> <QUOTED_STRING> {
+ String p = SqlParserUtil.parseString(token.image);
+ comment = SqlLiteral.createCharString(p, getPos());
+ }
+ ]
+ {
+ SqlTableColumn computedColumn = new SqlTableColumn(identifier, expr, comment, getPos());
+ context.columnList.add(computedColumn);
+ }
}
void TableColumn2(List<SqlNode> list) :
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 35931a6..7f04d30 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -22,8 +22,6 @@ import org.apache.flink.sql.parser.ExtendedSqlNode;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.error.SqlValidateException;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -172,12 +170,10 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
.collect(Collectors.toSet());
for (SqlNode column : columnList) {
- if (column instanceof SqlTableColumn) {
- SqlTableColumn tableColumn = (SqlTableColumn) column;
- if (primaryKeyColumns.contains(tableColumn.getName().getSimple())) {
- SqlDataTypeSpec notNullType = tableColumn.getType().withNullable(false);
- tableColumn.setType(notNullType);
- }
+ SqlTableColumn tableColumn = (SqlTableColumn) column;
+ if (!tableColumn.isGenerated() && primaryKeyColumns.contains(tableColumn.getName().getSimple())) {
+ SqlDataTypeSpec notNullType = tableColumn.getType().withNullable(false);
+ tableColumn.setType(notNullType);
}
}
}
@@ -189,7 +185,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
public boolean containsComputedColumn() {
for (SqlNode column : columnList) {
- if (column instanceof SqlBasicCall) {
+ if (((SqlTableColumn) column).isGenerated()) {
return true;
}
}
@@ -200,9 +196,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
public List<SqlTableConstraint> getFullConstraints() {
List<SqlTableConstraint> ret = new ArrayList<>();
this.columnList.forEach(column -> {
- if (column instanceof SqlTableColumn) {
- ((SqlTableColumn) column).getConstraint()
- .map(ret::add);
+ SqlTableColumn tableColumn = (SqlTableColumn) column;
+ if (!tableColumn.isGenerated()) {
+ tableColumn.getConstraint().map(ret::add);
}
});
ret.addAll(this.tableConstraints);
@@ -236,12 +232,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
writer.startList("", "");
for (SqlNode column : columnList) {
writer.sep(",");
- if (column instanceof SqlTableColumn) {
- SqlTableColumn tableColumn = (SqlTableColumn) column;
- tableColumn.getName().unparse(writer, 0, 0);
- } else {
- column.unparse(writer, 0, 0);
+ SqlTableColumn tableColumn = (SqlTableColumn) column;
+ if (tableColumn.isGenerated()) {
+ tableColumn.getExpr().get().unparse(writer, 0, 0);
+ writer.keyword("AS");
}
+ tableColumn.getName().unparse(writer, 0, 0);
}
return writer.toString();
@@ -264,16 +260,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
for (SqlNode column : columnList) {
printIndent(writer);
- if (column instanceof SqlBasicCall) {
- SqlCall call = (SqlCall) column;
- SqlCall newCall = call.getOperator().createCall(
- SqlParserPos.ZERO,
- call.operand(1),
- call.operand(0));
- newCall.unparse(writer, leftPrec, rightPrec);
- } else {
- column.unparse(writer, leftPrec, rightPrec);
- }
+ column.unparse(writer, leftPrec, rightPrec);
}
if (tableConstraints.size() > 0) {
for (SqlTableConstraint constraint : tableConstraints) {
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
index 1d108dd..c8d4d72 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
@@ -47,10 +47,12 @@ public class SqlTableColumn extends SqlCall {
new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);
private SqlIdentifier name;
- private SqlDataTypeSpec type;
+ private SqlDataTypeSpec type;
private SqlTableConstraint constraint;
+ private SqlNode expr;
+
private SqlCharStringLiteral comment;
public SqlTableColumn(SqlIdentifier name,
@@ -65,6 +67,16 @@ public class SqlTableColumn extends SqlCall {
this.comment = comment;
}
+ public SqlTableColumn(SqlIdentifier name,
+ SqlNode expr,
+ @Nullable SqlCharStringLiteral comment,
+ SqlParserPos pos) {
+ super(pos);
+ this.name = requireNonNull(name, "Column name should not be null");
+ this.expr = requireNonNull(expr, "Column expression should not be null");
+ this.comment = comment;
+ }
+
@Override
public SqlOperator getOperator() {
return OPERATOR;
@@ -72,20 +84,27 @@ public class SqlTableColumn extends SqlCall {
@Override
public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(name, type, comment);
+ return isGenerated() ?
+ ImmutableNullableList.of(name, expr, comment) :
+ ImmutableNullableList.of(name, type, comment);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
this.name.unparse(writer, leftPrec, rightPrec);
- writer.print(" ");
- this.type.unparse(writer, leftPrec, rightPrec);
- if (!this.type.getNullable()) {
- // Default is nullable.
- writer.keyword("NOT NULL");
- }
- if (this.constraint != null) {
- this.constraint.unparse(writer, leftPrec, rightPrec);
+ if (isGenerated()) {
+ writer.keyword("AS");
+ this.expr.unparse(writer, leftPrec, rightPrec);
+ } else {
+ writer.print(" ");
+ this.type.unparse(writer, leftPrec, rightPrec);
+ if (!this.type.getNullable()) {
+ // Default is nullable.
+ writer.keyword("NOT NULL");
+ }
+ if (this.constraint != null) {
+ this.constraint.unparse(writer, leftPrec, rightPrec);
+ }
}
if (this.comment != null) {
writer.print(" COMMENT ");
@@ -93,6 +112,15 @@ public class SqlTableColumn extends SqlCall {
}
}
+ /**
+ * Returns if this column is a computed column that is generated from an expression.
+ *
+ * @return true if this column is generated
+ */
+ public boolean isGenerated() {
+ return expr != null;
+ }
+
public SqlIdentifier getName() {
return name;
}
@@ -109,6 +137,10 @@ public class SqlTableColumn extends SqlCall {
this.type = type;
}
+ public Optional<SqlNode> getExpr() {
+ return Optional.ofNullable(expr);
+ }
+
public Optional<SqlTableConstraint> getConstraint() {
return Optional.ofNullable(constraint);
}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 76bddcd..713fde3 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -318,6 +318,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
+ public void testCreateTableWithCommentOnComputedColumn() {
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a bigint comment 'test column comment AAA.',\n" +
+ " h varchar, \n" +
+ " g as 2 * (a + 1) comment 'test computed column.', \n" +
+ " ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+ " b varchar,\n" +
+ " proc as PROCTIME(), \n" +
+ " PRIMARY KEY (a, b)\n" +
+ ")\n" +
+ "comment 'test table comment ABC.'\n" +
+ "PARTITIONED BY (a, h)\n" +
+ " with (\n" +
+ " 'connector' = 'kafka', \n" +
+ " 'kafka.topic' = 'log.test'\n" +
+ ")\n";
+ final String expected = "CREATE TABLE `TBL1` (\n" +
+ " `A` BIGINT COMMENT 'test column comment AAA.',\n" +
+ " `H` VARCHAR,\n" +
+ " `G` AS (2 * (`A` + 1)) COMMENT 'test computed column.',\n" +
+ " `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+ " `B` VARCHAR,\n" +
+ " `PROC` AS `PROCTIME`(),\n" +
+ " PRIMARY KEY (`A`, `B`)\n" +
+ ")\n" +
+ "COMMENT 'test table comment ABC.'\n" +
+ "PARTITIONED BY (`A`, `H`)\n" +
+ "WITH (\n" +
+ " 'connector' = 'kafka',\n" +
+ " 'kafka.topic' = 'log.test'\n" +
+ ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
public void testTableConstraints() {
final String sql = "CREATE TABLE tbl1 (\n" +
" a bigint,\n" +
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
index fb9b1c2..6c0f27db 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
@@ -37,7 +37,6 @@ import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -372,12 +371,10 @@ class MergeTableLikeUtil {
collectPhysicalFieldsTypes(derivedColumns);
for (SqlNode derivedColumn : derivedColumns) {
-
- boolean isComputed = !(derivedColumn instanceof SqlTableColumn);
+ final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn;
final TableColumn column;
- if (isComputed) {
- SqlBasicCall call = (SqlBasicCall) derivedColumn;
- String fieldName = call.operand(1).toString();
+ if (tableColumn.isGenerated()) {
+ String fieldName = tableColumn.getName().toString();
if (columns.containsKey(fieldName)) {
if (!columns.get(fieldName).isGenerated()) {
throw new ValidationException(String.format(
@@ -395,7 +392,7 @@ class MergeTableLikeUtil {
}
SqlNode validatedExpr = sqlValidator.validateParameterizedExpression(
- call.operand(0),
+ tableColumn.getExpr().get(),
physicalFieldNamesToTypes);
final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
column = TableColumn.of(
@@ -404,7 +401,7 @@ class MergeTableLikeUtil {
escapeExpressions.apply(validatedExpr));
computedFieldNamesToTypes.put(fieldName, validatedType);
} else {
- String name = ((SqlTableColumn) derivedColumn).getName().getSimple();
+ String name = tableColumn.getName().getSimple();
LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
}
@@ -414,8 +411,8 @@ class MergeTableLikeUtil {
private void collectPhysicalFieldsTypes(List<SqlNode> derivedColumns) {
for (SqlNode derivedColumn : derivedColumns) {
- if (derivedColumn instanceof SqlTableColumn) {
- SqlTableColumn column = (SqlTableColumn) derivedColumn;
+ SqlTableColumn column = (SqlTableColumn) derivedColumn;
+ if (!column.isGenerated()) {
String name = column.getName().getSimple();
if (columns.containsKey(name)) {
throw new ValidationException(String.format(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
index 6473432..b03a10d 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalQualifier;
@@ -712,9 +711,10 @@ public class MergeTableLikeUtilTest {
}
private SqlNode tableColumn(String name, SqlNode expression) {
- return new SqlBasicCall(
- new SqlAsOperator(),
- new SqlNode[]{expression, identifier(name)},
+ return new SqlTableColumn (
+ identifier(name),
+ expression,
+ null,
SqlParserPos.ZERO
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 9d3ab4d..2a35460 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -630,7 +630,7 @@ public class SqlToOperationConverter {
TableSchema.Builder builder = new TableSchema.Builder();
// collect the physical table schema first.
final List<SqlNode> physicalColumns = columnList.getList().stream()
- .filter(n -> n instanceof SqlTableColumn).collect(Collectors.toList());
+ .filter(n -> !((SqlTableColumn) n).isGenerated()).collect(Collectors.toList());
for (SqlNode node : physicalColumns) {
SqlTableColumn column = (SqlTableColumn) node;
final RelDataType relType = column.getType()