You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/28 02:33:56 UTC
[flink] branch master updated: [FLINK-16021][table-common] Fix
DescriptorProperties.putTableSchema does not include PRIMARY KEY
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3a322e0 [FLINK-16021][table-common] Fix DescriptorProperties.putTableSchema does not include PRIMARY KEY
3a322e0 is described below
commit 3a322e0c4babf2f31223fb75ef61ec23f229011b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu May 28 10:32:53 2020 +0800
[FLINK-16021][table-common] Fix DescriptorProperties.putTableSchema does not include PRIMARY KEY
This closes #12275
---
.../ElasticsearchUpsertTableSinkFactoryBase.java | 8 ++-
.../flink/connector/hbase/HBaseTableFactory.java | 8 ++-
.../table/catalog/hive/HiveCatalogITCase.java | 46 ++++++++++++++++
.../jdbc/table/JdbcTableSourceSinkFactory.java | 8 ++-
.../kafka/KafkaTableSourceSinkFactoryBase.java | 8 ++-
.../gateway/utils/TestTableSinkFactoryBase.java | 8 ++-
.../gateway/utils/TestTableSourceFactoryBase.java | 8 ++-
.../org/apache/flink/table/descriptors/OldCsv.java | 4 +-
.../flink/table/descriptors/SchemaValidator.java | 10 +++-
.../flink/table/sinks/CsvTableSinkFactoryBase.java | 14 ++---
.../table/sources/CsvTableSourceFactoryBase.java | 18 +++---
.../flink/table/utils/TableSourceFactoryMock.java | 10 +++-
.../table/descriptors/DescriptorProperties.java | 64 ++++++++++++++--------
.../table/factories/TableFormatFactoryBase.java | 7 ++-
.../descriptors/DescriptorPropertiesTest.java | 15 +++--
.../planner/catalog/CatalogConstraintTest.java | 9 +--
.../flink/table/utils/InMemoryTableFactory.scala | 7 ++-
17 files changed, 177 insertions(+), 75 deletions(-)
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
index efbbb7e..a20cb33 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -149,13 +149,17 @@ public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamT
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// watermark
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
// format wildcard
properties.add(FORMAT + ".*");
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
index 64227d3..f64f9b9 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
@@ -55,7 +55,7 @@ import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -192,13 +192,17 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamT
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// watermark
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
return properties;
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index 2817361..f6fa581 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,9 +27,11 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableBuilder;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.OldCsv;
@@ -60,12 +62,15 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
@@ -310,6 +315,47 @@ public class HiveCatalogITCase {
}
@Test
+ public void testTableWithPrimaryKey() {
+ EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().useBlinkPlanner();
+ EnvironmentSettings settings = builder.build();
+ TableEnvironment tableEnv = TableEnvironment.create(settings);
+ tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ tableEnv.registerCatalog("catalog1", hiveCatalog);
+ tableEnv.useCatalog("catalog1");
+
+ final String createTable = "CREATE TABLE pk_src (\n" +
+ " uuid varchar(40) not null,\n" +
+ " price DECIMAL(10, 2),\n" +
+ " currency STRING,\n" +
+ " ts6 TIMESTAMP(6),\n" +
+ " ts AS CAST(ts6 AS TIMESTAMP(3)),\n" +
+ " WATERMARK FOR ts AS ts,\n" +
+ " constraint ct1 PRIMARY KEY(uuid) NOT ENFORCED)\n" +
+ " WITH (\n" +
+ " 'connector.type' = 'filesystem'," +
+ " 'connector.path' = 'file://fakePath'," +
+ " 'format.type' = 'csv')";
+
+ tableEnv.executeSql(createTable);
+
+ TableSchema tableSchema = tableEnv.getCatalog(tableEnv.getCurrentCatalog())
+ .map(catalog -> {
+ try {
+ final ObjectPath tablePath = ObjectPath.fromString(catalog.getDefaultDatabase() + '.' + "pk_src");
+ return catalog.getTable(tablePath).getSchema();
+ } catch (TableNotExistException e) {
+ return null;
+ }
+ }).orElse(null);
+ assertNotNull(tableSchema);
+ assertEquals(
+ tableSchema.getPrimaryKey(),
+ Optional.of(UniqueConstraint.primaryKey("ct1", Collections.singletonList("uuid"))));
+ tableEnv.executeSql("DROP TABLE pk_src");
+ }
+
+ @Test
public void testNewTableFactory() {
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.newInstance().inBatchMode().build());
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
index 438779f..c74d0d9 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
@@ -42,7 +42,7 @@ import java.util.Optional;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -119,13 +119,17 @@ public class JdbcTableSourceSinkFactory implements
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// watermark
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
return properties;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 072091a..9d79760 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -52,7 +52,7 @@ import java.util.Properties;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -131,7 +131,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// time attributes
properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -149,6 +149,10 @@ public abstract class KafkaTableSourceSinkFactoryBase implements
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
// format wildcard
properties.add(FORMAT + ".*");
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index bc201c1..25ee6da 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -37,7 +38,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -80,7 +81,7 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
@@ -88,6 +89,9 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
return properties;
}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index 980cb74..38d733b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -41,7 +41,7 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -84,7 +84,7 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
@@ -93,6 +93,10 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
return properties;
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
index 3d06615..10695c2 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
@@ -279,8 +279,8 @@ public class OldCsv extends FormatDescriptor {
properties.putBoolean(FORMAT_DERIVE_SCHEMA, true);
} else {
List<String> subKeys = Arrays.asList(
- DescriptorProperties.TABLE_SCHEMA_NAME,
- DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
+ DescriptorProperties.NAME,
+ DescriptorProperties.DATA_TYPE);
List<List<String>> subValues = schema.entrySet().stream()
.map(e -> Arrays.asList(e.getKey(), e.getValue()))
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
index 2bb1dc0..bb0c0ab 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -40,7 +40,7 @@ import java.util.Map;
import java.util.Optional;
import static java.lang.String.format;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -140,7 +140,7 @@ public class SchemaValidator implements DescriptorValidator {
keys.add(SCHEMA + ".#." + SCHEMA_NAME);
keys.add(SCHEMA + ".#." + SCHEMA_FROM);
// computed column
- keys.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ keys.add(SCHEMA + ".#." + EXPR);
// time attributes
keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
@@ -158,6 +158,10 @@ public class SchemaValidator implements DescriptorValidator {
keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
return keys;
}
@@ -292,7 +296,7 @@ public class SchemaValidator implements DescriptorValidator {
boolean isRowtime = properties
.containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
boolean isGeneratedColumn = properties
- .containsKey(SCHEMA + "." + i + "." + TABLE_SCHEMA_EXPR);
+ .containsKey(SCHEMA + "." + i + "." + EXPR);
// remove proctime/rowtime from mapping
if (isProctime || isRowtime || isGeneratedColumn) {
mapping.remove(name);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
index c1cf012..3b9bf55 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.java
@@ -76,9 +76,9 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory {
// connector
properties.add(CONNECTOR_PATH);
// format
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TYPE);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.DATA_TYPE);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.NAME);
properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA);
properties.add(FORMAT_FIELD_DELIMITER);
properties.add(CONNECTOR_PATH);
@@ -86,10 +86,10 @@ public abstract class CsvTableSinkFactoryBase implements TableFactory {
properties.add(FORMAT_NUM_FILES);
// schema
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
// schema watermark
properties.add(SCHEMA + "." + DescriptorProperties.WATERMARK + ".*");
return properties;
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
index 47d3dc0..3b0e26b 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/CsvTableSourceFactoryBase.java
@@ -40,7 +40,6 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -80,9 +79,9 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
// connector
properties.add(CONNECTOR_PATH);
// format
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
- properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.TYPE);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.DATA_TYPE);
+ properties.add(FORMAT_FIELDS + ".#." + DescriptorProperties.NAME);
properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA);
properties.add(FORMAT_FIELD_DELIMITER);
properties.add(FORMAT_LINE_DELIMITER);
@@ -92,14 +91,17 @@ public abstract class CsvTableSourceFactoryBase implements TableFactory {
properties.add(FORMAT_IGNORE_PARSE_ERRORS);
properties.add(CONNECTOR_PATH);
// schema
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_TYPE);
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_DATA_TYPE);
- properties.add(SCHEMA + ".#." + DescriptorProperties.TABLE_SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.DATA_TYPE);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.NAME);
+ properties.add(SCHEMA + ".#." + DescriptorProperties.EXPR);
// watermark
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
return properties;
}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
index 4307231..bef2bd4 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableSourceFactoryMock.java
@@ -35,11 +35,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
/**
* Mocking {@link TableSourceFactory} for tests.
@@ -79,11 +80,16 @@ public class TableSourceFactoryMock implements TableSourceFactory<Row> {
supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);
supportedProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_TYPE);
// computed column
- supportedProperties.add(Schema.SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ supportedProperties.add(Schema.SCHEMA + ".#." + EXPR);
// watermark
supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
supportedProperties.add(Schema.SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+
+ // table constraint
+ supportedProperties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ supportedProperties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
return supportedProperties;
}
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index bcb3e9b..584ba44 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -74,30 +74,31 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class DescriptorProperties {
- public static final String TABLE_SCHEMA_NAME = "name";
+ public static final String NAME = "name";
- /**
- * @deprecated this will be removed in future version as it uses old type system.
- * Please use {@link #TABLE_SCHEMA_DATA_TYPE} instead.
- */
- @Deprecated
- public static final String TABLE_SCHEMA_TYPE = "type";
+ public static final String TYPE = "type";
- public static final String TABLE_SCHEMA_DATA_TYPE = "data-type";
+ public static final String DATA_TYPE = "data-type";
- public static final String TABLE_SCHEMA_EXPR = "expr";
+ public static final String EXPR = "expr";
public static final String PARTITION_KEYS = "partition.keys";
- public static final String PARTITION_KEYS_NAME = "name";
-
public static final String WATERMARK = "watermark";
public static final String WATERMARK_ROWTIME = "rowtime";
- public static final String WATERMARK_STRATEGY_EXPR = "strategy.expr";
+ public static final String WATERMARK_STRATEGY = "strategy";
+
+ public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR;
+
+ public static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE;
- public static final String WATERMARK_STRATEGY_DATA_TYPE = "strategy.data-type";
+ public static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+ public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+ private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name");
private static final Consumer<String> EMPTY_CONSUMER = (value) -> {};
@@ -225,7 +226,7 @@ public class DescriptorProperties {
putIndexedOptionalProperties(
key,
- Arrays.asList(TABLE_SCHEMA_NAME, TABLE_SCHEMA_DATA_TYPE, TABLE_SCHEMA_EXPR),
+ Arrays.asList(NAME, DATA_TYPE, EXPR),
values);
if (!schema.getWatermarkSpecs().isEmpty()) {
@@ -241,6 +242,11 @@ public class DescriptorProperties {
Arrays.asList(WATERMARK_ROWTIME, WATERMARK_STRATEGY_EXPR, WATERMARK_STRATEGY_DATA_TYPE),
watermarkValues);
}
+
+ schema.getPrimaryKey().ifPresent(pk -> {
+ putString(key + '.' + PRIMARY_KEY_NAME, pk.getName());
+ putString(key + '.' + PRIMARY_KEY_COLUMNS, String.join(",", pk.getColumns()));
+ });
}
/**
@@ -251,7 +257,7 @@ public class DescriptorProperties {
putIndexedFixedProperties(
PARTITION_KEYS,
- Collections.singletonList(PARTITION_KEYS_NAME),
+ Collections.singletonList(NAME),
keys.stream().map(Collections::singletonList).collect(Collectors.toList()));
}
@@ -610,7 +616,9 @@ public class DescriptorProperties {
public Optional<TableSchema> getOptionalTableSchema(String key) {
// filter for number of fields
final int fieldCount = properties.keySet().stream()
- .filter((k) -> k.startsWith(key) && k.endsWith('.' + TABLE_SCHEMA_NAME))
+ .filter((k) -> k.startsWith(key)
+ // "key." is the prefix.
+ && SCHEMA_COLUMN_NAME_SUFFIX.matcher(k.substring(key.length() + 1)).matches())
.mapToInt((k) -> 1)
.sum();
@@ -621,10 +629,10 @@ public class DescriptorProperties {
// validate fields and build schema
final TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < fieldCount; i++) {
- final String nameKey = key + '.' + i + '.' + TABLE_SCHEMA_NAME;
- final String legacyTypeKey = key + '.' + i + '.' + TABLE_SCHEMA_TYPE;
- final String typeKey = key + '.' + i + '.' + TABLE_SCHEMA_DATA_TYPE;
- final String exprKey = key + '.' + i + '.' + TABLE_SCHEMA_EXPR;
+ final String nameKey = key + '.' + i + '.' + NAME;
+ final String legacyTypeKey = key + '.' + i + '.' + TYPE;
+ final String typeKey = key + '.' + i + '.' + DATA_TYPE;
+ final String exprKey = key + '.' + i + '.' + EXPR;
final String name = optionalGet(nameKey).orElseThrow(exceptionSupplier(nameKey));
@@ -669,6 +677,14 @@ public class DescriptorProperties {
}
}
+ // Extract unique constraints.
+ String pkConstraintNameKey = key + '.' + PRIMARY_KEY_NAME;
+ final Optional<String> pkConstraintNameOpt = optionalGet(pkConstraintNameKey);
+ if (pkConstraintNameOpt.isPresent()) {
+ final String pkColumnsKey = key + '.' + PRIMARY_KEY_COLUMNS;
+ final String columns = optionalGet(pkColumnsKey).orElseThrow(exceptionSupplier(pkColumnsKey));
+ schemaBuilder.primaryKey(pkConstraintNameOpt.get(), columns.split(","));
+ }
return Optional.of(schemaBuilder.build());
}
@@ -684,7 +700,7 @@ public class DescriptorProperties {
*/
public List<String> getPartitionKeys() {
return getFixedIndexedProperties(
- PARTITION_KEYS, Collections.singletonList(PARTITION_KEYS_NAME))
+ PARTITION_KEYS, Collections.singletonList(NAME))
.stream()
.map(map -> map.values().iterator().next())
.map(this::getString).collect(Collectors.toList());
@@ -1166,13 +1182,13 @@ public class DescriptorProperties {
public void validateTableSchema(String key, boolean isOptional) {
final Consumer<String> nameValidation = (fullKey) -> validateString(fullKey, false, 1);
final Consumer<String> typeValidation = (fullKey) -> {
- String fallbackKey = fullKey.replace("." + TABLE_SCHEMA_DATA_TYPE, "." + TABLE_SCHEMA_TYPE);
+ String fallbackKey = fullKey.replace("." + DATA_TYPE, "." + TYPE);
validateDataType(fullKey, fallbackKey, false);
};
final Map<String, Consumer<String>> subKeys = new HashMap<>();
- subKeys.put(TABLE_SCHEMA_NAME, nameValidation);
- subKeys.put(TABLE_SCHEMA_DATA_TYPE, typeValidation);
+ subKeys.put(NAME, nameValidation);
+ subKeys.put(DATA_TYPE, typeValidation);
validateFixedIndexedProperties(
key,
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
index fdae4d6..794b6ee 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFormatFactoryBase.java
@@ -31,7 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
@@ -105,7 +105,7 @@ public abstract class TableFormatFactoryBase<T> implements TableFormatFactory<T>
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
properties.add(SCHEMA + ".#." + SCHEMA_FROM);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR);
+ properties.add(SCHEMA + ".#." + EXPR);
// time attributes
properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
@@ -120,6 +120,9 @@ public abstract class TableFormatFactoryBase<T> implements TableFormatFactory<T>
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
}
properties.addAll(supportedFormatProperties());
return properties;
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
index 172d4c2..8f9fed7 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
@@ -175,17 +175,18 @@ public class DescriptorPropertiesTest {
@Test
public void testTableSchema() {
TableSchema schema = TableSchema.builder()
- .field("f0", DataTypes.BIGINT())
+ .field("f0", DataTypes.BIGINT().notNull())
.field("f1", DataTypes.ROW(
DataTypes.FIELD("q1", DataTypes.STRING()),
DataTypes.FIELD("q2", DataTypes.TIMESTAMP(9))))
- .field("f2", DataTypes.STRING())
- .field("f3", DataTypes.BIGINT(), "f0 + 1")
+ .field("f2", DataTypes.STRING().notNull())
+ .field("f3", DataTypes.BIGINT().notNull(), "f0 + 1")
.field("f4", DataTypes.DECIMAL(10, 3))
.watermark(
"f1.q2",
"`f1`.`q2` - INTERVAL '5' SECOND",
DataTypes.TIMESTAMP(3))
+ .primaryKey("constraint1", new String[] {"f0", "f2"})
.build();
DescriptorProperties properties = new DescriptorProperties();
@@ -193,19 +194,21 @@ public class DescriptorPropertiesTest {
Map<String, String> actual = properties.asMap();
Map<String, String> expected = new HashMap<>();
expected.put("schema.0.name", "f0");
- expected.put("schema.0.data-type", "BIGINT");
+ expected.put("schema.0.data-type", "BIGINT NOT NULL");
expected.put("schema.1.name", "f1");
expected.put("schema.1.data-type", "ROW<`q1` VARCHAR(2147483647), `q2` TIMESTAMP(9)>");
expected.put("schema.2.name", "f2");
- expected.put("schema.2.data-type", "VARCHAR(2147483647)");
+ expected.put("schema.2.data-type", "VARCHAR(2147483647) NOT NULL");
expected.put("schema.3.name", "f3");
- expected.put("schema.3.data-type", "BIGINT");
+ expected.put("schema.3.data-type", "BIGINT NOT NULL");
expected.put("schema.3.expr", "f0 + 1");
expected.put("schema.4.name", "f4");
expected.put("schema.4.data-type", "DECIMAL(10, 3)");
expected.put("schema.watermark.0.rowtime", "f1.q2");
expected.put("schema.watermark.0.strategy.expr", "`f1`.`q2` - INTERVAL '5' SECOND");
expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+ expected.put("schema.primary-key.name", "constraint1");
+ expected.put("schema.primary-key.columns", "f0,f2");
assertEquals(expected, actual);
TableSchema restored = properties.getTableSchema("schema");
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
index 25ca83b..fd01281 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
@@ -25,12 +25,9 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.utils.TableTestUtil;
-import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableSet;
@@ -67,7 +64,7 @@ public class CatalogConstraintTest {
public void testWithPrimaryKey() throws Exception {
TableSchema tableSchema = TableSchema.builder().fields(
new String[] { "a", "b", "c" },
- new DataType[] { DataTypes.STRING(), new AtomicDataType(new BigIntType(false)), DataTypes.INT() }
+ new DataType[] { DataTypes.STRING(), DataTypes.BIGINT().notNull(), DataTypes.INT() }
).primaryKey("b").build();
Map<String, String> properties = buildCatalogTableProperties(tableSchema);
@@ -109,10 +106,6 @@ public class CatalogConstraintTest {
properties.put("format.property-version", "1");
properties.put("format.field-delimiter", ";");
- // schema
- DescriptorProperties descriptorProperties = new DescriptorProperties(true);
- descriptorProperties.putTableSchema("format.fields", tableSchema);
- properties.putAll(descriptorProperties.asMap());
return properties;
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 76393b2..9916ab6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -142,7 +142,12 @@ class InMemoryTableFactory(terminationCount: Int)
properties.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);
// computed column
- properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR)
+ properties.add(SCHEMA + ".#." + EXPR)
+
+ // table constraint
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
+ properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);
+
properties
}
}