You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/03 08:19:12 UTC
[flink-table-store] branch master updated: [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new a47dc2b4 [FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
a47dc2b4 is described below
commit a47dc2b483692df2a61d2a567f1c58323e6a4fdb
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jan 3 16:19:08 2023 +0800
[FLINK-30545] Add check of 'NOT NULL' for 'ADD COLUMN' schema change
This closes #449
---
.../table/store/file/schema/SchemaManager.java | 2 ++
.../table/store/table/SchemaEvolutionTest.java | 10 +++++++++
.../store/spark/SparkSchemaEvolutionITCase.java | 25 ++++++++++++++++++++++
3 files changed, 37 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 4e6708fa..5c48dfcf 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -211,6 +211,8 @@ public class SchemaManager implements Serializable {
"The column [%s] exists in the table[%s].",
addColumn.fieldName(), tableRoot));
}
+ Preconditions.checkArgument(
+ addColumn.isNullable(), "ADD COLUMN cannot specify NOT NULL.");
int id = highestFieldId.incrementAndGet();
DataType dataType =
TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 373c1a88..73666f63 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -126,6 +126,16 @@ public class SchemaEvolutionTest {
// read where f3 = 3 (filter on new field)
rows = readRecords(table, builder.equal(2, 3L));
assertThat(rows).containsExactlyInAnyOrder(Row.of(3, 3L, 3L), Row.of(4, 4L, 4L));
+
+ // test add not null field
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.addColumn(
+ "f4", new IntType(), false, null))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("ADD COLUMN cannot specify NOT NULL.");
}
@Test
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index c23facfe..fb88cc88 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -79,6 +79,31 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
assertThat(results.toString()).isEqualTo("[[8]]");
}
+ @Test
+ public void testAddNotNullColumn() throws Exception {
+ Path tablePath = new Path(warehousePath, "default.db/testAddNotNullColumn");
+ createTestHelper(tablePath);
+
+ List<Row> beforeAdd =
+ spark.sql("SHOW CREATE TABLE tablestore.default.testAddNotNullColumn")
+ .collectAsList();
+ assertThat(beforeAdd.toString())
+ .isEqualTo(
+ "[[CREATE TABLE testAddNotNullColumn (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + "]]");
+
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "ALTER TABLE tablestore.default.testAddNotNullColumn ADD COLUMNS (d INT NOT NULL)"))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
+ }
+
@Test
public void testRenameColumn() throws Exception {
Path tablePath = new Path(warehousePath, "default.db/testRenameColumn");