You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/28 02:33:56 UTC

[flink] branch master updated: [FLINK-16021][table-common] Fix DescriptorProperties.putTableSchema does not include PRIMARY KEY

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a322e0  [FLINK-16021][table-common] Fix DescriptorProperties.putTableSchema does not include PRIMARY KEY
3a322e0 is described below

commit 3a322e0c4babf2f31223fb75ef61ec23f229011b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu May 28 10:32:53 2020 +0800

    [FLINK-16021][table-common] Fix DescriptorProperties.putTableSchema does not include PRIMARY KEY
    
    This closes #12275
---
 .../ElasticsearchUpsertTableSinkFactoryBase.java   |  8 ++-
 .../flink/connector/hbase/HBaseTableFactory.java   |  8 ++-
 .../table/catalog/hive/HiveCatalogITCase.java      | 46 ++++++++++++++++
 .../jdbc/table/JdbcTableSourceSinkFactory.java     |  8 ++-
 .../kafka/KafkaTableSourceSinkFactoryBase.java     |  8 ++-
 .../gateway/utils/TestTableSinkFactoryBase.java    |  8 ++-
 .../gateway/utils/TestTableSourceFactoryBase.java  |  8 ++-
 .../org/apache/flink/table/descriptors/OldCsv.java |  4 +-
 .../flink/table/descriptors/SchemaValidator.java   | 10 +++-
 .../flink/table/sinks/CsvTableSinkFactoryBase.java | 14 ++---
 .../table/sources/CsvTableSourceFactoryBase.java   | 18 +++---
 .../flink/table/utils/TableSourceFactoryMock.java  | 10 +++-
 .../table/descriptors/DescriptorProperties.java    | 64 ++++++++++++++--------
 .../table/factories/TableFormatFactoryBase.java    |  7 ++-
 .../descriptors/DescriptorPropertiesTest.java      | 15 +++--
 .../planner/catalog/CatalogConstraintTest.java     |  9 +--
 .../flink/table/utils/InMemoryTableFactory.scala   |  7 ++-
 17 files changed, 177 insertions(+), 75 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
index efbbb7e..a20cb33 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -149,13 +149,17 @@ public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamT
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 		// computed column
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 
 		// watermark
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		// format wildcard
 		properties.add(FORMAT + ".*");
 
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
index 64227d3..f64f9b9 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
@@ -55,7 +55,7 @@ import java.util.Map;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -192,13 +192,17 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamT
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 		// computed column
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 
 		// watermark
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		return properties;
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 2817361..f6fa581 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,9 +27,11 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.descriptors.FileSystem;
 import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.OldCsv;
@@ -60,12 +62,15 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 /**
@@ -310,6 +315,47 @@ public class HiveCatalogITCase {
 	}
 
 	@Test
+	public void testTableWithPrimaryKey() {
+		EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().useBlinkPlanner();
+		EnvironmentSettings settings = builder.build();
+		TableEnvironment tableEnv = TableEnvironment.create(settings);
+		tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+		tableEnv.registerCatalog("catalog1", hiveCatalog);
+		tableEnv.useCatalog("catalog1");
+
+		final String createTable = "CREATE TABLE pk_src (\n" +
+				"  uuid varchar(40) not null,\n" +
+				"  price DECIMAL(10, 2),\n" +
+				"  currency STRING,\n" +
+				"  ts6 TIMESTAMP(6),\n" +
+				"  ts AS CAST(ts6 AS TIMESTAMP(3)),\n" +
+				"  WATERMARK FOR ts AS ts,\n" +
+				"  constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n" +
+				"  WITH (\n" +
+				"    'connector.type' = 'filesystem'," +
+				"    'connector.path' = 'file://fakePath'," +
+				"    'format.type' = 'csv')";
+
+		tableEnv.executeSql(createTable);
+
+		TableSchema tableSchema = tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+				.map(catalog -> {
+					try {
+						final ObjectPath tablePath = ObjectPath.fromString(catalog.getDefaultDatabase() + '.' + "pk_src");
+						return catalog.getTable(tablePath).getSchema();
+					} catch (TableNotExistException e) {
+						return null;
+					}
+				}).orElse(null);
+		assertNotNull(tableSchema);
+		assertEquals(
+				tableSchema.getPrimaryKey(),
+				Optional.of(UniqueConstraint.primaryKey("ct1", Collections.singletonList("uuid"))));
+		tableEnv.executeSql("DROP TABLE pk_src");
+	}
+
+	@Test
 	public void testNewTableFactory() {
 		TableEnvironment tEnv = TableEnvironment.create(
 				EnvironmentSettings.newInstance().inBatchMode().build());
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
index 438779f..c74d0d9 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
@@ -42,7 +42,7 @@ import java.util.Optional;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -119,13 +119,17 @@ public class JdbcTableSourceSinkFactory implements
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 		// computed column
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 
 		// watermark
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		return properties;
 	}
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 072091a..9d79760 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -52,7 +52,7 @@ import java.util.Properties;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -131,7 +131,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 		properties.add(SCHEMA + ".#." + SCHEMA_FROM);
 		// computed column
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 
 		// time attributes
 		properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -149,6 +149,10 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		// format wildcard
 		properties.add(FORMAT + ".*");
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index bc201c1..25ee6da 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.StreamTableSinkFactory;
 import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -37,7 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -80,7 +81,7 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
 		properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 		properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
 		properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
 		properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
@@ -88,6 +89,9 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
 
 		return properties;
 	}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index 980cb74..38d733b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -41,7 +41,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -84,7 +84,7 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
 		properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + EXPR);
 		properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
 		properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
 		properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
@@ -93,6 +93,10 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		return properties;
 	}
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
index 3d06615..10695c2 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
@@ -279,8 +279,8 @@ public class OldCsv extends FormatDescriptor {
 			properties.putBoolean(FORMAT_DERIVE_SCHEMA, true);
 		} else {
 			List<String> subKeys = Arrays.asList(
-				DescriptorProperties.TABLE_SCHEMA_NAME,
-				DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
+				DescriptorProperties.NAME,
+				DescriptorProperties.DATA_TYPE);
 
 			List<List<String>> subValues = schema.entrySet().stream()
 				.map(e -> Arrays.asList(e.getKey(), e.getValue()))
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
index 2bb1dc0..bb0c0ab 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -40,7 +40,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static java.lang.String.format;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -140,7 +140,7 @@ public class SchemaValidator implements DescriptorValidator {
 		keys.add(SCHEMA + ".#." + SCHEMA_NAME);
 		keys.add(SCHEMA + ".#." + SCHEMA_FROM);
 		// computed column
-		keys.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		keys.add(SCHEMA + ".#." + EXPR);
 
 		// time attributes
 		keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -158,6 +158,10 @@ public class SchemaValidator implements DescriptorValidator {
 		keys.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		keys.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
+		// table constraint
+		keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		return keys;
 	}
 
@@ -292,7 +296,7 @@ public class SchemaValidator implements DescriptorValidator {
 				boolean isRowtime = properties
 					.containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
 				boolean isGeneratedColumn = properties
-					.containsKey(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR);
+					.containsKey(SCHEMA + "." + i + "." + EXPR);
 				// remove proctime/rowtime from mapping
 				if (isProctime || isRowtime || isGeneratedColumn) {
 					mapping.remove(name);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
index c1cf012..3b9bf55 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
@@ -76,9 +76,9 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory {
 		// connector
 		properties.add(CONNECTOR_PATH);
 		// format
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TYPE);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.DATA_TYPE);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.NAME);
 		properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA);
 		properties.add(FORMAT_FIELD_DELIMITER);
 		properties.add(CONNECTOR_PATH);
@@ -86,10 +86,10 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory {
 		properties.add(FORMAT_NUM_FILES);
 
 		// schema
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
 		// schema watermark
 		properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + ".*");
 		return properties;
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
index 47d3dc0..3b0e26b 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -40,7 +40,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -80,9 +79,9 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
 		// connector
 		properties.add(CONNECTOR_PATH);
 		// format
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
-		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TYPE);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.DATA_TYPE);
+		properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.NAME);
 		properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA);
 		properties.add(FORMAT_FIELD_DELIMITER);
 		properties.add(FORMAT_LINE_DELIMITER);
@@ -92,14 +91,17 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
 		properties.add(FORMAT_IGNORE_PARSE_ERRORS);
 		properties.add(CONNECTOR_PATH);
 		// schema
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
-		properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
-		properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+		properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
 		// watermark
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
+		// table constraint
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
 
 		return properties;
 	}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
index 4307231..bef2bd4 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
@@ -35,11 +35,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 
 /**
  * Mocking {@link TableSourceFactory} for tests.
@@ -79,11 +80,16 @@ public class TableSourceFactoryMock implements TableSourceFactory<Row> {
 		supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);
 		supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_TYPE);
 		// computed column
-		supportedProperties.add(Schema.SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+		supportedProperties.add(Schema.SCHEMA + ".#." + EXPR);
 		// watermark
 		supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 		supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 		supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
+
+		// table constraint
+		supportedProperties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+		supportedProperties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
 		return supportedProperties;
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index bcb3e9b..584ba44 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -74,30 +74,31 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class DescriptorProperties {
 
-	public static final String TABLE_SCHEMA_NAME = "name";
+	public static final String NAME = "name";
 
-	/**
-	 * @deprecated this will be removed in future version as it uses old type system.
-	 * 	 Please use {@link #TABLE_SCHEMA_DATA_TYPE} instead.
-	 */
-	@Deprecated
-	public static final String TABLE_SCHEMA_TYPE = "type";
+	public static final String TYPE = "type";
 
-	public static final String TABLE_SCHEMA_DATA_TYPE = "data-type";
+	public static final String DATA_TYPE = "data-type";
 
-	public static final String TABLE_SCHEMA_EXPR = "expr";
+	public static final String EXPR = "expr";
 
 	public static final String PARTITION_KEYS = "partition.keys";
 
-	public static final String PARTITION_KEYS_NAME = "name";
-
 	public static final String WATERMARK = "watermark";
 
 	public static final String WATERMARK_ROWTIME = "rowtime";
 
-	public static final String WATERMARK_STRATEGY_EXPR = "strategy.expr";
+	public static final String WATERMARK_STRATEGY = "strategy";
+
+	public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR;
+
+	public static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE;
 
-	public static final String WATERMARK_STRATEGY_DATA_TYPE = "strategy.data-type";
+	public static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+	public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+	private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name");
 
 	private static final Consumer<String> EMPTY_CONSUMER = (value) -> {};
 
@@ -225,7 +226,7 @@ public class DescriptorProperties {
 
 		putIndexedOptionalProperties(
 			key,
-			Arrays.asList(TABLE_SCHEMA_NAME, TABLE_SCHEMA_DATA_TYPE, TABLE_SCHEMA_EXPR),
+			Arrays.asList(NAME, DATA_TYPE, EXPR),
 			values);
 
 		if (!schema.getWatermarkSpecs().isEmpty()) {
@@ -241,6 +242,11 @@ public class DescriptorProperties {
 				Arrays.asList(WATERMARK_ROWTIME, WATERMARK_STRATEGY_EXPR, WATERMARK_STRATEGY_DATA_TYPE),
 				watermarkValues);
 		}
+
+		schema.getPrimaryKey().ifPresent(pk -> {
+			putString(key + '.' + PRIMARY_KEY_NAME, pk.getName());
+			putString(key + '.' + PRIMARY_KEY_COLUMNS, String.join(",", pk.getColumns()));
+		});
 	}
 
 	/**
@@ -251,7 +257,7 @@ public class DescriptorProperties {
 
 		putIndexedFixedProperties(
 				PARTITION_KEYS,
-				Collections.singletonList(PARTITION_KEYS_NAME),
+				Collections.singletonList(NAME),
 				keys.stream().map(Collections::singletonList).collect(Collectors.toList()));
 	}
 
@@ -610,7 +616,9 @@ public class DescriptorProperties {
 	public Optional<TableSchema> getOptionalTableSchema(String key) {
 		// filter for number of fields
 		final int fieldCount = properties.keySet().stream()
-			.filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME))
+			.filter((k) -> k.startsWith(key)
+					// "key." is the prefix.
+					&& SCHEMA_COLUMN_NAME_SUFFIX.matcher(k.substring(key.length() + 1)).matches())
 			.mapToInt((k) -> 1)
 			.sum();
 
@@ -621,10 +629,10 @@ public class DescriptorProperties {
 		// validate fields and build schema
 		final TableSchema.Builder schemaBuilder = TableSchema.builder();
 		for (int i = 0; i < fieldCount; i++) {
-			final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME;
-			final String legacyTypeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE;
-			final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_DATA_TYPE;
-			final String exprKey = key + '.' + i + '.' + TABLE_SCHEMA_EXPR;
+			final String nameKey = key + '.' + i + '.' + NAME;
+			final String legacyTypeKey = key + '.' + i + '.' + TYPE;
+			final String typeKey = key + '.' + i + '.' + DATA_TYPE;
+			final String exprKey = key + '.' + i + '.' + EXPR;
 
 			final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));
 
@@ -669,6 +677,14 @@ public class DescriptorProperties {
 			}
 		}
 
+		// Extract unique constraints.
+		String pkConstraintNameKey = key + '.' + PRIMARY_KEY_NAME;
+		final Optional<String> pkConstraintNameOpt = optionalGet(pkConstraintNameKey);
+		if (pkConstraintNameOpt.isPresent()) {
+			final String pkColumnsKey = key + '.' + PRIMARY_KEY_COLUMNS;
+			final String columns = optionalGet(pkColumnsKey).orElseThrow(exceptionSupplier(pkColumnsKey));
+			schemaBuilder.primaryKey(pkConstraintNameOpt.get(), columns.split(","));
+		}
 		return Optional.of(schemaBuilder.build());
 	}
 
@@ -684,7 +700,7 @@ public class DescriptorProperties {
 	 */
 	public List<String> getPartitionKeys() {
 		return getFixedIndexedProperties(
-				PARTITION_KEYS, Collections.singletonList(PARTITION_KEYS_NAME))
+				PARTITION_KEYS, Collections.singletonList(NAME))
 				.stream()
 				.map(map -> map.values().iterator().next())
 				.map(this::getString).collect(Collectors.toList());
@@ -1166,13 +1182,13 @@ public class DescriptorProperties {
 	public void validateTableSchema(String key, boolean isOptional) {
 		final Consumer<String> nameValidation = (fullKey) -> validateString(fullKey, false, 1);
 		final Consumer<String> typeValidation = (fullKey) -> {
-			String fallbackKey = fullKey.replace("." + TABLE_SCHEMA_DATA_TYPE, "." + TABLE_SCHEMA_TYPE);
+			String fallbackKey = fullKey.replace("." + DATA_TYPE, "." + TYPE);
 			validateDataType(fullKey, fallbackKey, false);
 		};
 
 		final Map<String, Consumer<String>> subKeys = new HashMap<>();
-		subKeys.put(TABLE_SCHEMA_NAME, nameValidation);
-		subKeys.put(TABLE_SCHEMA_DATA_TYPE, typeValidation);
+		subKeys.put(NAME, nameValidation);
+		subKeys.put(DATA_TYPE, typeValidation);
 
 		validateFixedIndexedProperties(
 			key,
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
index fdae4d6..794b6ee 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
@@ -31,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
 import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -105,7 +105,7 @@ public abstract class TableFormatFactoryBase<T> implements TableFormatFactory<T>
 			properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 			properties.add(SCHEMA + ".#." + SCHEMA_FROM);
 			// computed column
-			properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+			properties.add(SCHEMA + ".#." + EXPR);
 			// time attributes
 			properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
 			properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
@@ -120,6 +120,9 @@ public abstract class TableFormatFactoryBase<T> implements TableFormatFactory<T>
 			properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_ROWTIME);
 			properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_EXPR);
 			properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
+			// table constraint
+			properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+			properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
 		}
 		properties.addAll(supportedFormatProperties());
 		return properties;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
index 172d4c2..8f9fed7 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
@@ -175,17 +175,18 @@ public class DescriptorPropertiesTest {
 	@Test
 	public void testTableSchema() {
 		TableSchema schema = TableSchema.builder()
-			.field("f0", DataTypes.BIGINT())
+			.field("f0", DataTypes.BIGINT().notNull())
 			.field("f1", DataTypes.ROW(
 				DataTypes.FIELD("q1", DataTypes.STRING()),
 				DataTypes.FIELD("q2", DataTypes.TIMESTAMP(9))))
-			.field("f2", DataTypes.STRING())
-			.field("f3", DataTypes.BIGINT(), "f0 + 1")
+			.field("f2", DataTypes.STRING().notNull())
+			.field("f3", DataTypes.BIGINT().notNull(), "f0 + 1")
 			.field("f4", DataTypes.DECIMAL(10, 3))
 			.watermark(
 				"f1.q2",
 				"`f1`.`q2` - INTERVAL '5' SECOND",
 				DataTypes.TIMESTAMP(3))
+			.primaryKey("constraint1", new String[] {"f0", "f2"})
 			.build();
 
 		DescriptorProperties properties = new DescriptorProperties();
@@ -193,19 +194,21 @@ public class DescriptorPropertiesTest {
 		Map<String, String> actual = properties.asMap();
 		Map<String, String> expected = new HashMap<>();
 		expected.put("schema.0.name", "f0");
-		expected.put("schema.0.data-type", "BIGINT");
+		expected.put("schema.0.data-type", "BIGINT NOT NULL");
 		expected.put("schema.1.name", "f1");
 		expected.put("schema.1.data-type", "ROW<`q1` VARCHAR(2147483647), `q2` TIMESTAMP(9)>");
 		expected.put("schema.2.name", "f2");
-		expected.put("schema.2.data-type", "VARCHAR(2147483647)");
+		expected.put("schema.2.data-type", "VARCHAR(2147483647) NOT NULL");
 		expected.put("schema.3.name", "f3");
-		expected.put("schema.3.data-type", "BIGINT");
+		expected.put("schema.3.data-type", "BIGINT NOT NULL");
 		expected.put("schema.3.expr", "f0 + 1");
 		expected.put("schema.4.name", "f4");
 		expected.put("schema.4.data-type", "DECIMAL(10, 3)");
 		expected.put("schema.watermark.0.rowtime", "f1.q2");
 		expected.put("schema.watermark.0.strategy.expr", "`f1`.`q2` - INTERVAL '5' SECOND");
 		expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+		expected.put("schema.primary-key.name", "constraint1");
+		expected.put("schema.primary-key.columns", "f0,f2");
 		assertEquals(expected, actual);
 
 		TableSchema restored = properties.getTableSchema("schema");
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
index 25ca83b..fd01281 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
@@ -25,12 +25,9 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
 import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableSet;
 
@@ -67,7 +64,7 @@ public class CatalogConstraintTest {
 	public void testWithPrimaryKey() throws Exception {
 		TableSchema tableSchema = TableSchema.builder().fields(
 				new String[] { "a", "b", "c" },
-				new DataType[] { DataTypes.STRING(), new AtomicDataType(new BigIntType(false)), DataTypes.INT() }
+				new DataType[] { DataTypes.STRING(), DataTypes.BIGINT().notNull(), DataTypes.INT() }
 		).primaryKey("b").build();
 		Map<String, String> properties = buildCatalogTableProperties(tableSchema);
 
@@ -109,10 +106,6 @@ public class CatalogConstraintTest {
 		properties.put("format.property-version", "1");
 		properties.put("format.field-delimiter", ";");
 
-		// schema
-		DescriptorProperties descriptorProperties = new DescriptorProperties(true);
-		descriptorProperties.putTableSchema("format.fields", tableSchema);
-		properties.putAll(descriptorProperties.asMap());
 		return properties;
 	}
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 76393b2..9916ab6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -142,7 +142,12 @@ class InMemoryTableFactory(terminationCount: Int)
     properties.add(SCHEMA + "." + WATERMARK + ".#."  + WATERMARK_STRATEGY_DATA_TYPE);
 
     // computed column
-    properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR)
+    properties.add(SCHEMA + ".#." + EXPR)
+
+    // table constraint
+    properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+    properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
     properties
   }
 }