You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/06 08:58:41 UTC

[GitHub] [flink] wuchong commented on a change in pull request #11950: [FLINK-17030] Add primary key syntax to DDL

wuchong commented on a change in pull request #11950:
URL: https://github.com/apache/flink/pull/11950#discussion_r420583768



##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlConstraintEnforcement.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.constraint;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of SQL constraint enforcement. */
+public enum SqlConstraintEnforcement {
+	ENFORCED("ENFORCED"),
+	NOT_ENFORCED("NOT ENFORCED");
+
+	private String digest;

Review comment:
       Add a new line after this, and make `digest` final. 

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableColumn.java
##########
@@ -101,6 +109,10 @@ public void setType(SqlDataTypeSpec type) {
 		this.type = type;
 	}
 
+	public SqlTableConstraint getConstraint() {

Review comment:
       Return `Optional<SqlTableConstraint>` just like what we do for comment.

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * ALTER TABLE DROP CONSTRAINT constraint_name.

Review comment:
       `ALTER TABLE <table_name> DROP CONSTRAINT <constraint_name>` ?

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * ALTER TABLE ADD [CONSTRAINT constraint_name]

Review comment:
       `ALTER TABLE <table_name> ADD  ...` ?

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/constraint/SqlTableConstraint.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.constraint;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Table constraint of a table definition.
+ *
+ * <p>Syntax from
+ * SQL-2011 IWD 9075-2:201?(E) 11.3 &lt;table definition&gt;:
+ *
+ * <pre>
+ * &lt;table constraint definition&gt; ::=
+ *   [ &lt;constraint name definition&gt; ] &lt;table constraint&gt;
+ *       [ &lt;constraint characteristics&gt; ]
+ *
+ * &lt;table constraint&gt; ::=
+ *     &lt;unique constraint definition&gt;
+ *
+ * &lt;unique constraint definition&gt; ::=
+ *     &lt;unique specification&gt; &lt;left paren&gt; &lt;unique column list&gt; &lt;right paren&gt;
+ *
+ * &lt;unique specification&gt; ::=
+ *     UNIQUE
+ *   | PRIMARY KEY
+ * </pre>
+ */
+public class SqlTableConstraint extends SqlCall {
+	/** Use this operator only if you don't have a better one. */
+	private static final SqlOperator OPERATOR =
+			new SqlSpecialOperator("SqlTableConstraint", SqlKind.OTHER);
+
+	private final SqlIdentifier constraintName;
+	private final SqlLiteral uniqueSpec;
+	private final SqlNodeList columns;
+	private final SqlLiteral enforcement;
+	// Whether this is a table constraint, currently it is only used for SQL unparse.
+	private final boolean isTableConstraint;
+
+	/**
+	 * Creates a table constraint node.
+	 *
+	 * @param constraintName Constraint name
+	 * @param uniqueSpec     Unique specification
+	 * @param columns        Column list on which the constraint enforces
+	 *                       or null if this is a column constraint
+	 * @param enforcement    Whether the constraint is enforced
+	 * @param isTableConstraint Whether this is a table constraint
+	 * @param pos            Parser position
+	 */
+	public SqlTableConstraint(
+			@Nullable SqlIdentifier constraintName,
+			SqlLiteral uniqueSpec,
+			@Nullable SqlNodeList columns,
+			@Nullable SqlLiteral enforcement,
+			boolean isTableConstraint,
+			SqlParserPos pos) {
+		super(pos);
+		this.constraintName = constraintName;
+		this.uniqueSpec = uniqueSpec;
+		this.columns = columns;
+		this.enforcement = enforcement;
+		this.isTableConstraint = isTableConstraint;
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return OPERATOR;
+	}
+
+	/** Returns whether the constraint is UNIQUE. */
+	public boolean isUnique() {
+		return this.uniqueSpec.getValueAs(SqlUniqueSpec.class) == SqlUniqueSpec.UNIQUE;
+	}
+
+	/** Returns whether the constraint is PRIMARY KEY. */
+	public boolean isPrimaryKey() {
+		return this.uniqueSpec.getValueAs(SqlUniqueSpec.class) == SqlUniqueSpec.PRIMARY_KEY;
+	}
+
+	/** Returns whether the constraint is enforced. */
+	public boolean isEnforced() {
+		// Default is enforced.
+		return this.enforcement == null
+				|| this.enforcement.getValueAs(SqlConstraintEnforcement.class)
+					== SqlConstraintEnforcement.ENFORCED;
+	}
+
+	public String getConstraintName() {

Review comment:
       return `Optional<String>` to avoid nullable result. 

##########
File path: flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
##########
@@ -164,23 +168,35 @@ public void validate() throws SqlValidateException {
 			validator.addColumn(column);
 		}
 
-		for (SqlNode primaryKeyNode : this.primaryKeyList) {
-			String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple();
-			if (!validator.contains(primaryKey)) {
-				throw new SqlValidateException(
-					primaryKeyNode.getParserPosition(),
-					"Primary key [" + primaryKey + "] not defined in columns, at " +
-						primaryKeyNode.getParserPosition());
+		// Validate table constraints.
+		boolean pkDefined = false;
+		Set<String> constraintNames = new HashSet<>();
+		for (SqlTableConstraint constraint : getFullConstraints()) {
+			String constraintName = constraint.getConstraintName();
+			// Validate constraint name should be unique.
+			if (constraintName != null && !constraintNames.add(constraintName)) {
+				throw new SqlValidateException(constraint.getParserPosition(),
+						String.format("Duplicate definition for constraint [%s]", constraintName));
 			}
-		}
-
-		for (SqlNodeList uniqueKeys: this.uniqueKeysList) {
-			for (SqlNode uniqueKeyNode : uniqueKeys) {
-				String uniqueKey = ((SqlIdentifier) uniqueKeyNode).getSimple();
-				if (!validator.contains(uniqueKey)) {
-					throw new SqlValidateException(
-							uniqueKeyNode.getParserPosition(),
-							"Unique key [" + uniqueKey + "] not defined in columns, at " + uniqueKeyNode.getParserPosition());
+			// Validate primary key definition should be unique.
+			if (constraint.isPrimaryKey()) {
+				if (pkDefined) {
+					throw new SqlValidateException(constraint.getParserPosition(),
+							"Duplicate primary key definition");
+				} else {
+					pkDefined = true;
+				}
+			}
+			// Validate the key field exists.
+			if (constraint.isTableConstraint()) {
+				for (SqlNode column : constraint.getColumns()) {

Review comment:
       I think primary key and unique can't be defined on computed columns, could you add validation and tests for this?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableAddConstraintOperation.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. **/
+public class AlterTableAddConstraintOperation extends AlterTableOperation {

Review comment:
       I think the `AlterTableOperation` should simply describe the SQL statement, not materialize the alter changes. What I mean is this class should only contain `tableIdentifier` and `newConstraint`. 
   
   The same to `AlterTableDropConstraintOperation`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
##########
@@ -71,4 +72,45 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
 		}
 		return schema;
 	}
+
+	/**
+	 * Creates a builder with given table schema.
+	 *
+	 * @param oriSchema Original schema
+	 * @return the builder with all the information from the given schema
+	 */
+	public static TableSchema.Builder builderWithGivenSchema(TableSchema oriSchema) {
+		// Copy fields.
+		TableSchema.Builder builder = TableSchema.builder()
+				.fields(oriSchema.getFieldNames(), oriSchema.getFieldDataTypes());
+		// Copy watermark specification.
+		for (WatermarkSpec wms : oriSchema.getWatermarkSpecs()) {
+			builder.watermark(
+					wms.getRowtimeAttribute(),
+					wms.getWatermarkExpr(),
+					wms.getWatermarkExprOutputType());
+		}
+		// Copy primary key constraint.
+		oriSchema.getPrimaryKey()
+				.map(pk -> builder.primaryKey(pk.getName(),
+						pk.getColumns().toArray(new String[0])));
+		return builder;
+	}
+
+	/**
+	 * Creates a new schema but drop the constraints.
+	 */
+	public static TableSchema copySchemaWithoutConstraint(TableSchema oriSchema) {

Review comment:
       Maybe a simper method name is `dropConstraint(TableSchema)`?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -660,9 +720,30 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
 			builder.watermark(rowtimeAttribute, getQuotedSqlString(validated), exprDataType);
 		});
 
+		// Set up table and column constraints into the schema.
+		for (SqlTableConstraint constraint : sqlCreateTable.getFullConstraints()) {
+			final String constraintName = constraint.getConstraintName();
+			if (constraint.isPrimaryKey()) {
+				if (constraintName != null) {
+					builder.primaryKey(constraintName, constraint.getColumnNames());
+				} else {
+					builder.primaryKey(constraint.getColumnNames());
+				}
+			}
+		}
 		return builder.build();
 	}
 
+	private void validateTableConstraint(SqlTableConstraint constraint) {
+		if (constraint.isUnique()) {
+			throw new ValidationException("UNIQUE constraint needs to implement");

Review comment:
       Please throw `UnsupportedException` with `UNIQUE constraint is not supported yet.` message. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
##########
@@ -71,4 +72,45 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
 		}
 		return schema;
 	}
+
+	/**
+	 * Creates a builder with given table schema.
+	 *
+	 * @param oriSchema Original schema
+	 * @return the builder with all the information from the given schema
+	 */
+	public static TableSchema.Builder builderWithGivenSchema(TableSchema oriSchema) {
+		// Copy fields.
+		TableSchema.Builder builder = TableSchema.builder()
+				.fields(oriSchema.getFieldNames(), oriSchema.getFieldDataTypes());
+		// Copy watermark specification.
+		for (WatermarkSpec wms : oriSchema.getWatermarkSpecs()) {
+			builder.watermark(
+					wms.getRowtimeAttribute(),
+					wms.getWatermarkExpr(),
+					wms.getWatermarkExprOutputType());
+		}
+		// Copy primary key constraint.
+		oriSchema.getPrimaryKey()
+				.map(pk -> builder.primaryKey(pk.getName(),
+						pk.getColumns().toArray(new String[0])));
+		return builder;
+	}
+
+	/**
+	 * Creates a new schema but drop the constraints.
+	 */
+	public static TableSchema copySchemaWithoutConstraint(TableSchema oriSchema) {
+		// Copy fields.
+		TableSchema.Builder builder = TableSchema.builder()
+				.fields(oriSchema.getFieldNames(), oriSchema.getFieldDataTypes());

Review comment:
       This loses computed column information. 
   

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
##########
@@ -71,4 +72,45 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
 		}
 		return schema;
 	}
+
+	/**
+	 * Creates a builder with given table schema.
+	 *
+	 * @param oriSchema Original schema
+	 * @return the builder with all the information from the given schema
+	 */
+	public static TableSchema.Builder builderWithGivenSchema(TableSchema oriSchema) {
+		// Copy fields.
+		TableSchema.Builder builder = TableSchema.builder()
+				.fields(oriSchema.getFieldNames(), oriSchema.getFieldDataTypes());

Review comment:
       This loses computed column information. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -279,8 +278,61 @@ private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
 				throw new ValidationException(String.format("Table %s doesn't exist or is a temporary table.",
 					tableIdentifier.toString()));
 			}
+		} else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) {
+			Optional<CatalogManager.TableLookupResult> optionalCatalogTable =
+					catalogManager.getTable(tableIdentifier);
+			if (optionalCatalogTable.isPresent() && !optionalCatalogTable.get().isTemporary()) {
+				SqlTableConstraint constraint = ((SqlAlterTableAddConstraint) sqlAlterTable)
+						.getConstraint();
+				validateTableConstraint(constraint);
+				CatalogTable oriCatalogTable = (CatalogTable) optionalCatalogTable.get().getTable();
+				TableSchema.Builder builder = TableSchemaUtils
+						.builderWithGivenSchema(oriCatalogTable.getSchema());
+				if (constraint.getConstraintName() != null) {
+					builder.primaryKey(constraint.getConstraintName(), constraint.getColumnNames());

Review comment:
       If we are going to set not null in `builder.primaryKey`, then we should check the key fields not null before calling the method. Because FLIP-84 says:
   
   > In alter statements if a users says to create a primary key on a nullable column the operation should fail.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -660,9 +720,30 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
 			builder.watermark(rowtimeAttribute, getQuotedSqlString(validated), exprDataType);
 		});
 
+		// Set up table and column constraints into the schema.
+		for (SqlTableConstraint constraint : sqlCreateTable.getFullConstraints()) {
+			final String constraintName = constraint.getConstraintName();
+			if (constraint.isPrimaryKey()) {
+				if (constraintName != null) {
+					builder.primaryKey(constraintName, constraint.getColumnNames());
+				} else {
+					builder.primaryKey(constraint.getColumnNames());
+				}
+			}
+		}
 		return builder.build();
 	}
 
+	private void validateTableConstraint(SqlTableConstraint constraint) {
+		if (constraint.isUnique()) {
+			throw new ValidationException("UNIQUE constraint needs to implement");
+		}
+		if (constraint.isEnforced()) {
+			throw new ValidationException("ENFORCED mode needs to implement,"

Review comment:
       `Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is NOT ENFORCED mode.`
   
   
   
   
   
   

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
##########
@@ -71,4 +72,45 @@ public static TableSchema checkNoGeneratedColumns(TableSchema schema) {
 		}
 		return schema;
 	}
+
+	/**
+	 * Creates a builder with given table schema.
+	 *
+	 * @param oriSchema Original schema
+	 * @return the builder with all the information from the given schema
+	 */
+	public static TableSchema.Builder builderWithGivenSchema(TableSchema oriSchema) {

Review comment:
       Please add test for this method and `copySchemaWithoutConstraint`, to make sure this method is up to date when `TableSchema` changes. 

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -919,6 +919,20 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
       .getTable(new ObjectPath(tableEnv.getCurrentDatabase, "t2"))
       .getProperties
     assertEquals(expectedProperties, properties)
+    val currentCatalog = tableEnv.getCurrentCatalog
+    val currentDB = tableEnv.getCurrentDatabase
+    tableEnv.sqlUpdate("alter table t2 add constraint ct1 primary key(a) not enforced")

Review comment:
       This should fail? Because `a` is nullable.
   
   Btw, please use `tableEnv.executeSql(...)`.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
##########
@@ -803,7 +803,7 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
     val ddl1 =
       """
         |create table t1(
-        |  a bigint,
+        |  a bigint not null,

Review comment:
       Why change this? I guess you want to change the nullablity in `testAlterTable`?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -612,10 +668,14 @@ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
 			if (node instanceof SqlTableColumn) {
 				SqlTableColumn column = (SqlTableColumn) node;
 				RelDataType relType = column.getType()
-					.deriveType(validator, column.getType().getNullable());
+					.deriveType(validator);
+				RelDataType colType = validator.getTypeFactory()
+						.createTypeWithNullability(
+								relType,
+								sqlCreateTable.isColumnNullable(column));

Review comment:
       I think we should modify the nullability in the `schemaBuilder.primaryKey(...)`. 
   Otherwise, the `TableSchema` is not a error-prone structure. 
   Otherwise, we have to do the same thing for descriptor API in the future. 

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -603,6 +655,10 @@ private Operation convertSqlQuery(SqlNode node) {
 	 * @return TableSchema
 	 */
 	private TableSchema createTableSchema(SqlCreateTable sqlCreateTable) {
+		// Unique key and enforced mode are not supported yet.
+		sqlCreateTable.getFullConstraints()
+				.forEach(this::validateTableConstraint);

Review comment:
       Could we move this validation to the beginning of `convertCreateTable` to make it more visible to developers? 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
##########
@@ -271,23 +275,78 @@ public void testCreateTable() {
 				DataTypes.VARCHAR(Integer.MAX_VALUE)});
 	}
 
-	@Test(expected = SqlConversionException.class)
-	public void testCreateTableWithPkUniqueKeys() {
-		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-		final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+	@Test
+	public void testCreateTableWithPrimaryKey() {
 		final String sql = "CREATE TABLE tbl1 (\n" +
 			"  a bigint,\n" +
 			"  b varchar, \n" +
 			"  c int, \n" +
 			"  d varchar, \n" +
-			"  primary key(a), \n" +
-			"  unique(a, b) \n" +
-			")\n" +
-			"  PARTITIONED BY (a, d)\n" +
-			"  with (\n" +
-			"    'connector' = 'kafka', \n" +
-			"    'kafka.topic' = 'log.test'\n" +
+			"  constraint ct1 primary key(a, b) not enforced\n" +
+			") with (\n" +
+			"  'connector' = 'kafka', \n" +
+			"  'kafka.topic' = 'log.test'\n" +
 			")\n";
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+		Operation operation = parse(sql, planner, parser);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		TableSchema tableSchema = catalogTable.getSchema();
+		assertThat(
+				tableSchema
+						.getPrimaryKey()
+						.map(UniqueConstraint::asSummaryString)
+						.orElse("fakeVal"),
+				is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
+		assertArrayEquals(tableSchema.getFieldNames(),
+				new String[] {"a", "b", "c", "d"});
+		assertArrayEquals(tableSchema.getFieldDataTypes(),
+				new DataType[]{
+						DataTypes.BIGINT().notNull(),
+						DataTypes.STRING().notNull(),
+						DataTypes.INT(),
+						DataTypes.STRING()});

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
##########
@@ -271,23 +275,78 @@ public void testCreateTable() {
 				DataTypes.VARCHAR(Integer.MAX_VALUE)});
 	}
 
-	@Test(expected = SqlConversionException.class)
-	public void testCreateTableWithPkUniqueKeys() {
-		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-		final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+	@Test
+	public void testCreateTableWithPrimaryKey() {
 		final String sql = "CREATE TABLE tbl1 (\n" +
 			"  a bigint,\n" +
 			"  b varchar, \n" +
 			"  c int, \n" +
 			"  d varchar, \n" +
-			"  primary key(a), \n" +
-			"  unique(a, b) \n" +
-			")\n" +
-			"  PARTITIONED BY (a, d)\n" +
-			"  with (\n" +
-			"    'connector' = 'kafka', \n" +
-			"    'kafka.topic' = 'log.test'\n" +
+			"  constraint ct1 primary key(a, b) not enforced\n" +
+			") with (\n" +
+			"  'connector' = 'kafka', \n" +
+			"  'kafka.topic' = 'log.test'\n" +
 			")\n";
+		FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+		final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+		Operation operation = parse(sql, planner, parser);
+		assert operation instanceof CreateTableOperation;
+		CreateTableOperation op = (CreateTableOperation) operation;
+		CatalogTable catalogTable = op.getCatalogTable();
+		TableSchema tableSchema = catalogTable.getSchema();
+		assertThat(
+				tableSchema
+						.getPrimaryKey()
+						.map(UniqueConstraint::asSummaryString)
+						.orElse("fakeVal"),
+				is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
+		assertArrayEquals(tableSchema.getFieldNames(),
+				new String[] {"a", "b", "c", "d"});

Review comment:
       Swap these 2 parameters, the first parameter should be `expected` , the second should be `actual`.
   
   ```
   assertArrayEquals(new String[] {"a", "b", "c", "d"}, tableSchema.getFieldNames());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org