You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/18 04:00:12 UTC

[flink-table-store] branch master updated: [FLINK-30033] Add primary key type validation

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 03cfe956 [FLINK-30033] Add primary key type validation
03cfe956 is described below

commit 03cfe956b63768e6743516e842c4763ecc760c16
Author: shammon <zj...@gmail.com>
AuthorDate: Fri Nov 18 12:00:08 2022 +0800

    [FLINK-30033] Add primary key type validation
    
    This closes #383
---
 .../table/store/file/schema/SchemaManager.java     | 22 ++++++++++++
 .../flink/table/store/file/schema/TableSchema.java | 21 +++++++++++
 .../table/store/file/schema/SchemaManagerTest.java | 42 ++++++++++++++++++++++
 3 files changed, 85 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 2940f465..7ab38278 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
 import org.apache.flink.table.store.file.utils.AtomicFileWriter;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.util.Preconditions;
@@ -108,6 +109,7 @@ public class SchemaManager implements Serializable {
         List<String> primaryKeys = updateSchema.primaryKeys();
         Map<String, String> options = updateSchema.options();
 
+        validatePrimaryKeysType(updateSchema, primaryKeys);
         while (true) {
             long id;
             int highestFieldId;
@@ -157,6 +159,26 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private void validatePrimaryKeysType(UpdateSchema updateSchema, List<String> primaryKeys) {
+        if (!primaryKeys.isEmpty()) {
+            Map<String, RowType.RowField> rowFields = new HashMap<>();
+            for (RowType.RowField rowField : updateSchema.rowType().getFields()) {
+                rowFields.put(rowField.getName(), rowField);
+            }
+            for (String primaryKeyName : primaryKeys) {
+                RowType.RowField rowField = rowFields.get(primaryKeyName);
+                LogicalType logicalType = rowField.getType();
+                if (TableSchema.PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES.stream()
+                        .anyMatch(c -> c.isInstance(logicalType))) {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "The type %s in primary key field %s is unsupported",
+                                    logicalType.getClass().getSimpleName(), primaryKeyName));
+                }
+            }
+        }
+    }
+
     /** Create {@link SchemaChange}s. */
     public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
         while (true) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 9ef955ff..81f88271 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -20,10 +20,17 @@ package org.apache.flink.table.store.file.schema;
 
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.UserDefinedType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -311,4 +318,18 @@ public class TableSchema implements Serializable {
             }
         }
     }
+
+    public static final List<Class<? extends LogicalType>> PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES =
+            Arrays.asList(
+                    MapType.class,
+                    ArrayType.class,
+                    RowType.class,
+                    UserDefinedType.class,
+                    DistinctType.class,
+                    StructuredType.class,
+                    MultisetType.class,
+                    NullType.class,
+                    LegacyTypeInformationType.class,
+                    SymbolType.class,
+                    UnresolvedUserDefinedType.class);
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 20cd4acd..b6c8b910 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.store.file.schema;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 
@@ -47,6 +49,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem.retryArtificialException;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Test for {@link SchemaManager}. */
@@ -182,4 +185,43 @@ public class SchemaManagerTest {
                                 .mapToObj(String::valueOf)
                                 .toArray(String[]::new));
     }
+
+    @Test
+    public void testPrimaryKeyType() throws Exception {
+        final RowType mapPrimaryKeyType =
+                RowType.of(
+                        new MapType(new IntType(), new BigIntType()),
+                        new BigIntType(),
+                        new VarCharType());
+        final UpdateSchema mapPrimaryKeySchema =
+                new UpdateSchema(mapPrimaryKeyType, partitionKeys, primaryKeys, options, "");
+        assertThatThrownBy(() -> manager.commitNewVersion(mapPrimaryKeySchema))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "The type %s in primary key field %s is unsupported",
+                        MapType.class.getSimpleName(), "f0");
+
+        RowType doublePrimaryKeyType =
+                RowType.of(new DoubleType(), new BigIntType(), new VarCharType());
+        final UpdateSchema doublePrimaryKeySchema =
+                new UpdateSchema(doublePrimaryKeyType, partitionKeys, primaryKeys, options, "");
+
+        TableSchema tableSchema =
+                retryArtificialException(() -> manager.commitNewVersion(doublePrimaryKeySchema));
+
+        Optional<TableSchema> latest = retryArtificialException(() -> manager.latest());
+
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", new AtomicDataType(new DoubleType(false))),
+                        new DataField(1, "f1", new AtomicDataType(new BigIntType(false))),
+                        new DataField(2, "f2", new AtomicDataType(new VarCharType())));
+
+        assertThat(latest.isPresent()).isTrue();
+        assertThat(tableSchema).isEqualTo(latest.get());
+        assertThat(tableSchema.fields()).isEqualTo(fields);
+        assertThat(tableSchema.partitionKeys()).isEqualTo(partitionKeys);
+        assertThat(tableSchema.primaryKeys()).isEqualTo(primaryKeys);
+        assertThat(tableSchema.options()).isEqualTo(options);
+    }
 }