You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 07:35:49 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28677] Primary key should not be null when creating Spark table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 746f1e28 [FLINK-28677] Primary key should not be null when creating Spark table
746f1e28 is described below

commit 746f1e28695d74bf29649f93b4cdb5cc127a6586
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Tue Jul 26 15:34:14 2022 +0800

    [FLINK-28677] Primary key should not be null when creating Spark table
    
    This closes #237
---
 .../table/store/file/schema/SchemaManager.java     |  6 ++
 .../table/store/file/schema/UpdateSchema.java      | 46 +++++++++++-
 .../table/store/file/schema/SchemaManagerTest.java |  4 +-
 .../flink/table/store/spark/SparkReadITCase.java   | 86 ++++++++++++++++++++++
 4 files changed, 139 insertions(+), 3 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 7dcec7f9..fe3c8f9b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -199,6 +199,12 @@ public class SchemaManager implements Serializable {
                             });
                 } else if (change instanceof UpdateColumnNullability) {
                     UpdateColumnNullability update = (UpdateColumnNullability) change;
+                    if (update.fieldNames().length == 1
+                            && update.newNullability()
+                            && schema.primaryKeys().contains(update.fieldNames()[0])) {
+                        throw new UnsupportedOperationException(
+                                "Cannot change nullability of primary key");
+                    }
                     updateNestedColumn(
                             newFields,
                             update.fieldNames(),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
index 29aa4840..87477589 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/UpdateSchema.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
@@ -53,7 +56,7 @@ public class UpdateSchema {
             List<String> primaryKeys,
             Map<String, String> options,
             String comment) {
-        this.rowType = rowType;
+        this.rowType = validateRowType(rowType, primaryKeys, partitionKeys);
         this.partitionKeys = partitionKeys;
         this.primaryKeys = primaryKeys;
         this.options = new HashMap<>(options);
@@ -80,6 +83,47 @@ public class UpdateSchema {
         return comment;
     }
 
+    private RowType validateRowType(
+            RowType rowType, List<String> primaryKeys, List<String> partitionKeys) {
+        List<String> fieldNames = rowType.getFieldNames();
+        Set<String> allFields = new HashSet<>(fieldNames);
+        Preconditions.checkState(
+                allFields.containsAll(partitionKeys),
+                "Table column %s should include all partition fields %s",
+                fieldNames,
+                partitionKeys);
+
+        if (primaryKeys.isEmpty()) {
+            return rowType;
+        }
+        Preconditions.checkState(
+                allFields.containsAll(primaryKeys),
+                "Table column %s should include all primary key constraint %s",
+                fieldNames,
+                primaryKeys);
+        Set<String> pkSet = new HashSet<>(primaryKeys);
+        Preconditions.checkState(
+                pkSet.containsAll(partitionKeys),
+                "Primary key constraint %s should include all partition fields %s",
+                primaryKeys,
+                partitionKeys);
+
+        // primary key should not nullable
+        List<RowType.RowField> fields = new ArrayList<>();
+        for (RowType.RowField field : rowType.getFields()) {
+            if (pkSet.contains(field.getName()) && field.getType().isNullable()) {
+                fields.add(
+                        new RowType.RowField(
+                                field.getName(),
+                                field.getType().copy(false),
+                                field.getDescription().orElse(null)));
+            } else {
+                fields.add(field);
+            }
+        }
+        return new RowType(false, fields);
+    }
+
     @Override
     public String toString() {
         return "UpdateSchema{"
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
index 59f3b959..20cd4acd 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaManagerTest.java
@@ -93,8 +93,8 @@ public class SchemaManagerTest {
 
         List<DataField> fields =
                 Arrays.asList(
-                        new DataField(0, "f0", new AtomicDataType(new IntType())),
-                        new DataField(1, "f1", new AtomicDataType(new BigIntType())),
+                        new DataField(0, "f0", new AtomicDataType(new IntType(false))),
+                        new DataField(1, "f1", new AtomicDataType(new BigIntType(false))),
                         new DataField(2, "f2", new AtomicDataType(new VarCharType())));
 
         assertThat(latest.isPresent()).isTrue();
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 95fcfafa..1326ae27 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -316,6 +317,24 @@ public class SparkReadITCase {
                 .isTrue();
     }
 
+    @Test
+    public void testAlterPrimaryKeyNullability() {
+        spark.sql("USE table_store");
+        spark.sql(
+                "CREATE TABLE default.testAlterPkNullability (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING table_store\n"
+                        + "COMMENT 'table comment'\n"
+                        + "TBLPROPERTIES ('primary-key' = 'a')");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Cannot change nullability of primary key");
+    }
+
     @Test
     public void testAlterTableColumnComment() {
         assertThat(getField(schema1(), 0).description()).isNull();
@@ -356,6 +375,73 @@ public class SparkReadITCase {
                 .isEqualTo("a boolean array");
     }
 
+    @Test
+    public void testCreateTableWithNullablePk() {
+        spark.sql("USE table_store");
+        spark.sql(
+                "CREATE TABLE default.PkTable (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING table_store\n"
+                        + "COMMENT 'table comment'\n"
+                        + "TBLPROPERTIES ('primary-key' = 'a')");
+        Path tablePath = new Path(warehousePath, "default.db/PkTable");
+        TableSchema schema = FileStoreTableFactory.create(tablePath).schema();
+        assertThat(schema.logicalRowType().getTypeAt(0).isNullable()).isFalse();
+    }
+
+    @Test
+    public void testCreateTableWithInvalidPk() {
+        spark.sql("USE table_store");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "CREATE TABLE default.PartitionedPkTable (\n"
+                                                + "a BIGINT,\n"
+                                                + "b STRING,\n"
+                                                + "c DOUBLE) USING table_store\n"
+                                                + "COMMENT 'table comment'\n"
+                                                + "PARTITIONED BY (b)"
+                                                + "TBLPROPERTIES ('primary-key' = 'a')"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Primary key constraint [a] should include all partition fields [b]");
+    }
+
+    @Test
+    public void testCreateTableWithNonexistentPk() {
+        spark.sql("USE table_store");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "CREATE TABLE default.PartitionedPkTable (\n"
+                                                + "a BIGINT,\n"
+                                                + "b STRING,\n"
+                                                + "c DOUBLE) USING table_store\n"
+                                                + "COMMENT 'table comment'\n"
+                                                + "PARTITIONED BY (b)"
+                                                + "TBLPROPERTIES ('primary-key' = 'd')"))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining(
+                        "Table column [a, b, c] should include all primary key constraint [d]");
+    }
+
+    @Test
+    public void testCreateTableWithNonexistentPartition() {
+        spark.sql("USE table_store");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "CREATE TABLE default.PartitionedPkTable (\n"
+                                                + "a BIGINT,\n"
+                                                + "b STRING,\n"
+                                                + "c DOUBLE) USING table_store\n"
+                                                + "COMMENT 'table comment'\n"
+                                                + "PARTITIONED BY (d)"
+                                                + "TBLPROPERTIES ('primary-key' = 'a')"))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Couldn't find column d");
+    }
+
     @Test
     public void testCreateAndDropTable() throws Exception {
         innerTest("MyTable1", true, true, false);