You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/09/15 03:25:21 UTC

[flink] branch master updated: [FLINK-19092][sql-parser] Support to parse comment on computed column

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b873564  [FLINK-19092][sql-parser] Support to parse comment on computed column
b873564 is described below

commit b873564357612bbea788332632955fc256105d35
Author: SteNicholas <pr...@163.com>
AuthorDate: Tue Sep 15 11:23:22 2020 +0800

    [FLINK-19092][sql-parser] Support to parse comment on computed column
    
    This closes #13352
---
 .../src/main/codegen/includes/parserImpls.ftl      | 17 ++++---
 .../flink/sql/parser/ddl/SqlCreateTable.java       | 41 ++++++-----------
 .../flink/sql/parser/ddl/SqlTableColumn.java       | 52 +++++++++++++++++-----
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 35 +++++++++++++++
 .../planner/operations/MergeTableLikeUtil.java     | 17 +++----
 .../planner/operations/MergeTableLikeUtilTest.java |  8 ++--
 .../table/sqlexec/SqlToOperationConverter.java     |  2 +-
 7 files changed, 115 insertions(+), 57 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ae73717..0432b56 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -515,17 +515,24 @@ void Watermark(TableCreationContext context) :
 
 void ComputedColumn(TableCreationContext context) :
 {
-    SqlNode identifier;
+    SqlIdentifier identifier;
     SqlNode expr;
     SqlParserPos pos;
+    SqlCharStringLiteral comment = null;
 }
 {
     identifier = SimpleIdentifier() {pos = getPos();}
     <AS>
-    expr = Expression(ExprContext.ACCEPT_NON_QUERY) {
-        expr = SqlStdOperatorTable.AS.createCall(Span.of(identifier, expr).pos(), expr, identifier);
-        context.columnList.add(expr);
-    }
+    expr = Expression(ExprContext.ACCEPT_NON_QUERY)
+    [ <COMMENT> <QUOTED_STRING> {
+            String p = SqlParserUtil.parseString(token.image);
+            comment = SqlLiteral.createCharString(p, getPos());
+        }
+     ]
+     {
+            SqlTableColumn computedColumn = new SqlTableColumn(identifier, expr, comment, getPos());
+            context.columnList.add(computedColumn);
+      }
 }
 
 void TableColumn2(List<SqlNode> list) :
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 35931a6..7f04d30 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -22,8 +22,6 @@ import org.apache.flink.sql.parser.ExtendedSqlNode;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.error.SqlValidateException;
 
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -172,12 +170,10 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 				.collect(Collectors.toSet());
 
 			for (SqlNode column : columnList) {
-				if (column instanceof SqlTableColumn) {
-					SqlTableColumn tableColumn = (SqlTableColumn) column;
-					if (primaryKeyColumns.contains(tableColumn.getName().getSimple())) {
-						SqlDataTypeSpec notNullType = tableColumn.getType().withNullable(false);
-						tableColumn.setType(notNullType);
-					}
+				SqlTableColumn tableColumn = (SqlTableColumn) column;
+				if (!tableColumn.isGenerated() && primaryKeyColumns.contains(tableColumn.getName().getSimple())) {
+					SqlDataTypeSpec notNullType = tableColumn.getType().withNullable(false);
+					tableColumn.setType(notNullType);
 				}
 			}
 		}
@@ -189,7 +185,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 
 	public boolean containsComputedColumn() {
 		for (SqlNode column : columnList) {
-			if (column instanceof SqlBasicCall) {
+			if (((SqlTableColumn) column).isGenerated()) {
 				return true;
 			}
 		}
@@ -200,9 +196,9 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 	public List<SqlTableConstraint> getFullConstraints() {
 		List<SqlTableConstraint> ret = new ArrayList<>();
 		this.columnList.forEach(column -> {
-			if (column instanceof SqlTableColumn) {
-				((SqlTableColumn) column).getConstraint()
-						.map(ret::add);
+			SqlTableColumn tableColumn = (SqlTableColumn) column;
+			if (!tableColumn.isGenerated()) {
+				tableColumn.getConstraint().map(ret::add);
 			}
 		});
 		ret.addAll(this.tableConstraints);
@@ -236,12 +232,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 		writer.startList("", "");
 		for (SqlNode column : columnList) {
 			writer.sep(",");
-			if (column instanceof SqlTableColumn) {
-				SqlTableColumn tableColumn = (SqlTableColumn) column;
-				tableColumn.getName().unparse(writer, 0, 0);
-			} else {
-				column.unparse(writer, 0, 0);
+			SqlTableColumn tableColumn = (SqlTableColumn) column;
+			if (tableColumn.isGenerated()) {
+				tableColumn.getExpr().get().unparse(writer, 0, 0);
+				writer.keyword("AS");
 			}
+			tableColumn.getName().unparse(writer, 0, 0);
 		}
 
 		return writer.toString();
@@ -264,16 +260,7 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
 		SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
 		for (SqlNode column : columnList) {
 			printIndent(writer);
-			if (column instanceof SqlBasicCall) {
-				SqlCall call = (SqlCall) column;
-				SqlCall newCall = call.getOperator().createCall(
-					SqlParserPos.ZERO,
-					call.operand(1),
-					call.operand(0));
-				newCall.unparse(writer, leftPrec, rightPrec);
-			} else {
-				column.unparse(writer, leftPrec, rightPrec);
-			}
+			column.unparse(writer, leftPrec, rightPrec);
 		}
 		if (tableConstraints.size() > 0) {
 			for (SqlTableConstraint constraint : tableConstraints) {
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
index 1d108dd..c8d4d72 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
@@ -47,10 +47,12 @@ public class SqlTableColumn extends SqlCall {
 		new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);
 
 	private SqlIdentifier name;
-	private SqlDataTypeSpec type;
 
+	private SqlDataTypeSpec type;
 	private SqlTableConstraint constraint;
 
+	private SqlNode expr;
+
 	private SqlCharStringLiteral comment;
 
 	public SqlTableColumn(SqlIdentifier name,
@@ -65,6 +67,16 @@ public class SqlTableColumn extends SqlCall {
 		this.comment = comment;
 	}
 
+	public SqlTableColumn(SqlIdentifier name,
+			SqlNode expr,
+			@Nullable SqlCharStringLiteral comment,
+			SqlParserPos pos) {
+		super(pos);
+		this.name = requireNonNull(name, "Column name should not be null");
+		this.expr = requireNonNull(expr, "Column expression should not be null");
+		this.comment = comment;
+	}
+
 	@Override
 	public SqlOperator getOperator() {
 		return OPERATOR;
@@ -72,20 +84,27 @@ public class SqlTableColumn extends SqlCall {
 
 	@Override
 	public List<SqlNode> getOperandList() {
-		return ImmutableNullableList.of(name, type, comment);
+		return isGenerated() ?
+			ImmutableNullableList.of(name, expr, comment) :
+			ImmutableNullableList.of(name, type, comment);
 	}
 
 	@Override
 	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
 		this.name.unparse(writer, leftPrec, rightPrec);
-		writer.print(" ");
-		this.type.unparse(writer, leftPrec, rightPrec);
-		if (!this.type.getNullable()) {
-			// Default is nullable.
-			writer.keyword("NOT NULL");
-		}
-		if (this.constraint != null) {
-			this.constraint.unparse(writer, leftPrec, rightPrec);
+		if (isGenerated()) {
+			writer.keyword("AS");
+			this.expr.unparse(writer, leftPrec, rightPrec);
+		} else {
+			writer.print(" ");
+			this.type.unparse(writer, leftPrec, rightPrec);
+			if (!this.type.getNullable()) {
+				// Default is nullable.
+				writer.keyword("NOT NULL");
+			}
+			if (this.constraint != null) {
+				this.constraint.unparse(writer, leftPrec, rightPrec);
+			}
 		}
 		if (this.comment != null) {
 			writer.print(" COMMENT ");
@@ -93,6 +112,15 @@ public class SqlTableColumn extends SqlCall {
 		}
 	}
 
+	/**
+	 * Returns if this column is a computed column that is generated from an expression.
+	 *
+	 * @return true if this column is generated
+	 */
+	public boolean isGenerated() {
+		return expr != null;
+	}
+
 	public SqlIdentifier getName() {
 		return name;
 	}
@@ -109,6 +137,10 @@ public class SqlTableColumn extends SqlCall {
 		this.type = type;
 	}
 
+	public Optional<SqlNode> getExpr() {
+		return Optional.ofNullable(expr);
+	}
+
 	public Optional<SqlTableConstraint> getConstraint() {
 		return Optional.ofNullable(constraint);
 	}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 76bddcd..713fde3 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -318,6 +318,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	}
 
 	@Test
+	public void testCreateTableWithCommentOnComputedColumn() {
+		final String sql = "CREATE TABLE tbl1 (\n" +
+			"  a bigint comment 'test column comment AAA.',\n" +
+			"  h varchar, \n" +
+			"  g as 2 * (a + 1) comment 'test computed column.', \n" +
+			"  ts as toTimestamp(b, 'yyyy-MM-dd HH:mm:ss'), \n" +
+			"  b varchar,\n" +
+			"  proc as PROCTIME(), \n" +
+			"  PRIMARY KEY (a, b)\n" +
+			")\n" +
+			"comment 'test table comment ABC.'\n" +
+			"PARTITIONED BY (a, h)\n" +
+			"  with (\n" +
+			"    'connector' = 'kafka', \n" +
+			"    'kafka.topic' = 'log.test'\n" +
+			")\n";
+		final String expected = "CREATE TABLE `TBL1` (\n" +
+			"  `A`  BIGINT  COMMENT 'test column comment AAA.',\n" +
+			"  `H`  VARCHAR,\n" +
+			"  `G` AS (2 * (`A` + 1))  COMMENT 'test computed column.',\n" +
+			"  `TS` AS `TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss'),\n" +
+			"  `B`  VARCHAR,\n" +
+			"  `PROC` AS `PROCTIME`(),\n" +
+			"  PRIMARY KEY (`A`, `B`)\n" +
+			")\n" +
+			"COMMENT 'test table comment ABC.'\n" +
+			"PARTITIONED BY (`A`, `H`)\n" +
+			"WITH (\n" +
+			"  'connector' = 'kafka',\n" +
+			"  'kafka.topic' = 'log.test'\n" +
+			")";
+		sql(sql).ok(expected);
+	}
+
+	@Test
 	public void testTableConstraints() {
 		final String sql = "CREATE TABLE tbl1 (\n" +
 			"  a bigint,\n" +
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
index fb9b1c2..6c0f27db 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
@@ -37,7 +37,6 @@ import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.validate.SqlValidator;
@@ -372,12 +371,10 @@ class MergeTableLikeUtil {
 			collectPhysicalFieldsTypes(derivedColumns);
 
 			for (SqlNode derivedColumn : derivedColumns) {
-
-				boolean isComputed = !(derivedColumn instanceof SqlTableColumn);
+				final SqlTableColumn tableColumn = (SqlTableColumn) derivedColumn;
 				final TableColumn column;
-				if (isComputed) {
-					SqlBasicCall call = (SqlBasicCall) derivedColumn;
-					String fieldName = call.operand(1).toString();
+				if (tableColumn.isGenerated()) {
+					String fieldName = tableColumn.getName().toString();
 					if (columns.containsKey(fieldName)) {
 						if (!columns.get(fieldName).isGenerated()) {
 							throw new ValidationException(String.format(
@@ -395,7 +392,7 @@ class MergeTableLikeUtil {
 					}
 
 					SqlNode validatedExpr = sqlValidator.validateParameterizedExpression(
-						call.operand(0),
+						tableColumn.getExpr().get(),
 						physicalFieldNamesToTypes);
 					final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr);
 					column = TableColumn.of(
@@ -404,7 +401,7 @@ class MergeTableLikeUtil {
 						escapeExpressions.apply(validatedExpr));
 					computedFieldNamesToTypes.put(fieldName, validatedType);
 				} else {
-					String name = ((SqlTableColumn) derivedColumn).getName().getSimple();
+					String name = tableColumn.getName().getSimple();
 					LogicalType logicalType = FlinkTypeFactory.toLogicalType(physicalFieldNamesToTypes.get(name));
 					column = TableColumn.of(name, TypeConversions.fromLogicalToDataType(logicalType));
 				}
@@ -414,8 +411,8 @@ class MergeTableLikeUtil {
 
 		private void collectPhysicalFieldsTypes(List<SqlNode> derivedColumns) {
 			for (SqlNode derivedColumn : derivedColumns) {
-				if (derivedColumn instanceof SqlTableColumn) {
-					SqlTableColumn column = (SqlTableColumn) derivedColumn;
+				SqlTableColumn column = (SqlTableColumn) derivedColumn;
+				if (!column.isGenerated()) {
 					String name = column.getName().getSimple();
 					if (columns.containsKey(name)) {
 						throw new ValidationException(String.format(
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
index 6473432..b03a10d 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.calcite.avatica.util.TimeUnit;
-import org.apache.calcite.sql.SqlAsOperator;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlIntervalQualifier;
@@ -712,9 +711,10 @@ public class MergeTableLikeUtilTest {
 	}
 
 	private SqlNode tableColumn(String name, SqlNode expression) {
-		return new SqlBasicCall(
-			new SqlAsOperator(),
-			new SqlNode[]{expression, identifier(name)},
+		return new SqlTableColumn (
+			identifier(name),
+			expression,
+			null,
 			SqlParserPos.ZERO
 		);
 	}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 9d3ab4d..2a35460 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -630,7 +630,7 @@ public class SqlToOperationConverter {
 		TableSchema.Builder builder = new TableSchema.Builder();
 		// collect the physical table schema first.
 		final List<SqlNode> physicalColumns = columnList.getList().stream()
-			.filter(n -> n instanceof SqlTableColumn).collect(Collectors.toList());
+			.filter(n -> !((SqlTableColumn) n).isGenerated()).collect(Collectors.toList());
 		for (SqlNode node : physicalColumns) {
 			SqlTableColumn column = (SqlTableColumn) node;
 			final RelDataType relType = column.getType()