You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/03 08:19:12 UTC

[flink-table-store] branch master updated: [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a47dc2b4 [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
a47dc2b4 is described below

commit a47dc2b483692df2a61d2a567f1c58323e6a4fdb
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jan 3 16:19:08 2023 +0800

    [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
    
    This closes #449
---
 .../table/store/file/schema/SchemaManager.java     |  2 ++
 .../table/store/table/SchemaEvolutionTest.java     | 10 +++++++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 25 ++++++++++++++++++++++
 3 files changed, 37 insertions(+)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 4e6708fa..5c48dfcf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -211,6 +211,8 @@ public class SchemaManager implements Serializable {
                                         "The column [%s] exists in the table[%s].",
                                         addColumn.fieldName(), tableRoot));
                     }
+                    Preconditions.checkArgument(
+                            addColumn.isNullable(), "ADD COLUMN cannot specify NOT NULL.");
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 373c1a88..73666f63 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -126,6 +126,16 @@ public class SchemaEvolutionTest {
         // read where f3 = 3 (filter on new field)
         rows = readRecords(table, builder.equal(2, 3L));
         assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L));
+
+        // test add not null field
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                SchemaChange.addColumn(
+                                                        "f4", new IntType(), false, null))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("ADD COLUMN cannot specify NOT NULL.");
     }
 
     @Test
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index c23facfe..fb88cc88 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -79,6 +79,31 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
         assertThat(results.toString()).isEqualTo("[[8]]");
     }
 
+    @Test
+    public void testAddNotNullColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testAddNotNullColumn");
+        createTestHelper(tablePath);
+
+        List<Row> beforeAdd =
+                spark.sql("SHOW CREATE TABLE tablestore.default.testAddNotNullColumn")
+                        .collectAsList();
+        assertThat(beforeAdd.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testAddNotNullColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE tablestore.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        "java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
+    }
+
     @Test
     public void testRenameColumn() throws Exception {
         Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");