You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/07 06:02:00 UTC

[GitHub] [flink] danny0405 commented on a change in pull request #11950: [FLINK-17030] Add primary key syntax to DDL

danny0405 commented on a change in pull request #11950:
URL: https://github.com/apache/flink/pull/11950#discussion_r421214451



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -279,8 +278,61 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
 				throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
 					tableIdentifier.toString()));
 			}
+		} else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) {
+			Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
+					catalogManager.getTable(tableIdentifier);
+			if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
+				SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
+						.getConstraint();
+				validateTableConstraint(constraint);
+				CatalogTable oriCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
+				TableSchema.Builder builder = TableSchemaUtils
+						.builderWithGivenSchema(oriCatalogTable.getSchema());
+				if (constraint.getConstraintName() != null) {
+					builder.primaryKey(constraint.getConstraintName(), constraint.getColumnNames());

Review comment:
       The builder will finally validate the nullability.

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
##########
@@ -164,23 +168,35 @@ public void validate() throws SqlValidateException {
 			validator.addColumn(column);
 		}
 
-		for (SqlNode primaryKeyNode : this.primaryKeyList) {
-			String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
-			if (!validator.contains(primaryKey)) {
-				throw new SqlValidateException(
-					primaryKeyNode.getParserPosition(),
-					"Primary key [" + primaryKey + "] not defined in columns, at " +
-						primaryKeyNode.getParserPosition());
+		// Validate table constraints.
+		boolean pkDefined = false;
+		Set<String> constraintNames = new HashSet<>();
+		for (SqlTableConstraint constraint : getFullConstraints()) {
+			String constraintName = constraint.getConstraintName();
+			// Validate constraint name should be unique.
+			if (constraintName != null && !constraintNames.add(constraintName)) {
+				throw new SqlValidateException(constraint.getParserPosition(),
+						String.format("Duplicate definition for constraint [%s]", constraintName));
 			}
-		}
-
-		for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
-			for (SqlNode uniqueKeyNode : uniqueKeys) {
-				String uniqueKey = ((SqlIdentifier) uniqueKeyNode).getSimple();
-				if (!validator.contains(uniqueKey)) {
-					throw new SqlValidateException(
-							uniqueKeyNode.getParserPosition(),
-							"Unique key [" + uniqueKey + "] not defined in columns, at " + uniqueKeyNode.getParserPosition());
+			// Validate primary key definition should be unique.
+			if (constraint.isPrimaryKey()) {
+				if (pkDefined) {
+					throw new SqlValidateException(constraint.getParserPosition(),
+							"Duplicate primary key definition");
+				} else {
+					pkDefined = true;
+				}
+			}
+			// Validate the key field exists.
+			if (constraint.isTableConstraint()) {
+				for (SqlNode column : constraint.getColumns()) {

Review comment:
       primary key on computed column is not supported from the parser, i would add a test.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -919,6 +919,20 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
       .getTable(new ObjectPath(tableEnv.getCurrentDatabase, "t2"))
       .getProperties
     assertEquals(expectedProperties, properties)
+    val currentCatalog = tableEnv.getCurrentCatalog
+    val currentDB = tableEnv.getCurrentDatabase
+    tableEnv.sqlUpdate("alter table t2 add constraint ct1 primary key(a) not enforced")

Review comment:
       It did fail. sqlUpdate is not deprecated, all the left tests use that, use a single executeSql seems weird.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -612,10 +668,14 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
 			if (node instanceof SqlTableColumn) {
 				SqlTableColumn column = (SqlTableColumn) node;
 				RelDataType relType = column.getType()
-					.deriveType(validator, column.getType().getNullable());
+					.deriveType(validator);
+				RelDataType colType = validator.getTypeFactory()
+						.createTypeWithNullability(
+								relType,
+								sqlCreateTable.isColumnNullable(column));

Review comment:
       I have some different thoughts, we should make the schema builder strict and simple for nullability, that means we should not modify the nullability if there is already a definition there, the only thing builder should do is validation if the building is valid.
   
   Different cases have varadic requests for nullability or other constraints, so let the builder invoker to keep the nullability correct is more reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org