You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/21 07:09:35 UTC

[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #54: [FLINK-26753] PK constraint should include partition keys if table is partitioned

JingsongLi commented on a change in pull request #54:
URL: https://github.com/apache/flink-table-store/pull/54#discussion_r830799765



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' = '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(

Review comment:
       Can we add this validation to `TableStore`?
   We can introduce a `adjustAndValidateKeys()`.
   withPartition and withPrimaryKeys will call this method.

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' = '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(
+                    new HashSet<>(pkCols).containsAll(partitionKeys),
+                    String.format(
+                            "Primary key constraint %s should include partition key %s",

Review comment:
       Primary key constraint %s should include all partition fields %s

##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
##########
@@ -249,42 +249,38 @@ private static Context createLogContext(Context context) {
     }
 
     @VisibleForTesting
-    static Path tablePath(Map<String, String> options, ObjectIdentifier identifier) {
-        Preconditions.checkArgument(
-                options.containsKey(FILE_PATH.key()),
-                String.format(
-                        "Failed to create file store path. "
-                                + "Please specify a root dir by setting session level configuration "
-                                + "as `SET 'table-store.%s' = '...'`. "
-                                + "Alternatively, you can use a per-table root dir "
-                                + "as `CREATE TABLE ${table} (...) WITH ('%s' = '...')`",
-                        FILE_PATH.key(), FILE_PATH.key()));
-        return new Path(
-                options.get(FILE_PATH.key()),
-                String.format(
-                        "root/%s.catalog/%s.db/%s",
-                        identifier.getCatalogName(),
-                        identifier.getDatabaseName(),
-                        identifier.getObjectName()));
-    }
-
-    private TableStore buildTableStore(Context context) {
+    TableStore buildTableStore(Context context) {
         ResolvedCatalogTable catalogTable = context.getCatalogTable();
         ResolvedSchema schema = catalogTable.getResolvedSchema();
         RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
-        int[] primaryKeys = new int[0];
+        List<String> partitionKeys = catalogTable.getPartitionKeys();
+        int[] pkIndex = new int[0];
         if (schema.getPrimaryKey().isPresent()) {
-            primaryKeys =
+            List<String> pkCols = schema.getPrimaryKey().get().getColumns();
+            Preconditions.checkState(
+                    new HashSet<>(pkCols).containsAll(partitionKeys),
+                    String.format(
+                            "Primary key constraint %s should include partition key %s",
+                            pkCols, partitionKeys));
+            Set<String> partFilter = new HashSet<>(partitionKeys);
+            pkIndex =
                     schema.getPrimaryKey().get().getColumns().stream()
+                            .filter(pk -> !partFilter.contains(pk))
                             .mapToInt(rowType.getFieldNames()::indexOf)
                             .toArray();
+            if (pkIndex.length == 0) {
+                throw new TableException(
+                        String.format(
+                                "Primary key constraint %s should not be same with partition key %s",

Review comment:
       Primary key constraint %s should not be same with partition fields %s, this will result in only one record in a partition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org