You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/05/11 08:33:48 UTC

[incubator-paimon] branch master updated: [flink] Support nullability change when altering table on flink 1.17 (#1112)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ba14cef8d [flink] Support nullability change when altering table on flink 1.17 (#1112)
ba14cef8d is described below

commit ba14cef8df1282a51cc039adb3d52eeacf239dc3
Author: GuojunLi <li...@bytedance.com>
AuthorDate: Thu May 11 16:33:43 2023 +0800

    [flink] Support nullability change when altering table on flink 1.17 (#1112)
---
 docs/content/how-to/altering-tables.md             | 15 +++++-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 62 +++++++++++++++-------
 .../apache/paimon/flink/SchemaChangeITCase.java    | 49 +++++++++++++++--
 3 files changed, 103 insertions(+), 23 deletions(-)

diff --git a/docs/content/how-to/altering-tables.md b/docs/content/how-to/altering-tables.md
index 2e4f969d2..07b6bae41 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -288,7 +288,7 @@ ALTER TABLE my_table DROP COLUMN c1;
 
 ## Changing Column Nullability
 
-The following SQL sets column `coupon_info` to be nullable.
+The following SQL changes nullability of column `coupon_info`.
 
 {{< tabs "change-nullability-example" >}}
 
@@ -296,7 +296,14 @@ The following SQL sets column `coupon_info` to be nullable.
 
 ```sql
 CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
+
+-- Change column `coupon_info` from NOT NULL to nullable
 ALTER TABLE my_table MODIFY coupon_info FLOAT;
+
+-- Change column `coupon_info` from nullable to NOT NULL
+-- If there are NULL values already, set table option as below to drop those records silently before altering table.
+SET 'table.exec.sink.not-null-enforcer' = 'DROP';
+ALTER TABLE my_table MODIFY coupon_info FLOAT NOT NULL;
 ```
 
 {{< /tab >}}
@@ -312,6 +319,12 @@ ALTER TABLE my_table ALTER COLUMN coupon_info DROP NOT NULL;
 
 {{< /tabs >}}
 
+{{< hint info >}}
+
+Changing nullable column to NOT NULL is only supported by Flink currently.
+
+{{< /hint >}}
+
 ## Changing Column Comment
 
 The following SQL changes comment of column `buy_count` to `buy count`.
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 9a3f7cd72..1b5e2b276 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -68,6 +68,7 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
@@ -250,37 +251,58 @@ public class FlinkCatalog extends AbstractCatalog {
         }
     }
 
-    private SchemaChange toSchemaChange(TableChange change) {
+    private List<SchemaChange> toSchemaChange(TableChange change) {
+        List<SchemaChange> schemaChanges = new ArrayList<>();
         if (change instanceof AddColumn) {
             AddColumn add = (AddColumn) change;
             String comment = add.getColumn().getComment().orElse(null);
             SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
-            return SchemaChange.addColumn(
-                    add.getColumn().getName(),
-                    LogicalTypeConversion.toDataType(
-                            add.getColumn().getDataType().getLogicalType()),
-                    comment,
-                    move);
+            schemaChanges.add(
+                    SchemaChange.addColumn(
+                            add.getColumn().getName(),
+                            LogicalTypeConversion.toDataType(
+                                    add.getColumn().getDataType().getLogicalType()),
+                            comment,
+                            move));
+            return schemaChanges;
         } else if (change instanceof DropColumn) {
             DropColumn drop = (DropColumn) change;
-            return SchemaChange.dropColumn(drop.getColumnName());
+            schemaChanges.add(SchemaChange.dropColumn(drop.getColumnName()));
+            return schemaChanges;
         } else if (change instanceof ModifyColumnName) {
             ModifyColumnName modify = (ModifyColumnName) change;
-            return SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName());
+            schemaChanges.add(
+                    SchemaChange.renameColumn(
+                            modify.getOldColumnName(), modify.getNewColumnName()));
+            return schemaChanges;
         } else if (change instanceof ModifyPhysicalColumnType) {
             ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
-            return SchemaChange.updateColumnType(
-                    modify.getOldColumn().getName(),
-                    LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+            LogicalType newColumnType = modify.getNewType().getLogicalType();
+            LogicalType oldColumnType = modify.getOldColumn().getDataType().getLogicalType();
+            if (newColumnType.isNullable() != oldColumnType.isNullable()) {
+                schemaChanges.add(
+                        SchemaChange.updateColumnNullability(
+                                new String[] {modify.getNewColumn().getName()},
+                                newColumnType.isNullable()));
+            }
+            schemaChanges.add(
+                    SchemaChange.updateColumnType(
+                            modify.getOldColumn().getName(),
+                            LogicalTypeConversion.toDataType(newColumnType)));
+            return schemaChanges;
         } else if (change instanceof ModifyColumnPosition) {
             ModifyColumnPosition modify = (ModifyColumnPosition) change;
             SchemaChange.Move move =
                     getMove(modify.getNewPosition(), modify.getNewColumn().getName());
-            return SchemaChange.updateColumnPosition(move);
+            schemaChanges.add(SchemaChange.updateColumnPosition(move));
+            return schemaChanges;
         } else if (change instanceof TableChange.ModifyColumnComment) {
             ModifyColumnComment modify = (ModifyColumnComment) change;
-            return SchemaChange.updateColumnComment(
-                    new String[] {modify.getNewColumn().getName()}, modify.getNewComment());
+            schemaChanges.add(
+                    SchemaChange.updateColumnComment(
+                            new String[] {modify.getNewColumn().getName()},
+                            modify.getNewComment()));
+            return schemaChanges;
         } else if (change instanceof SetOption) {
             SetOption setOption = (SetOption) change;
             String key = setOption.getKey();
@@ -288,10 +310,12 @@ public class FlinkCatalog extends AbstractCatalog {
 
             SchemaManager.checkAlterTablePath(key);
 
-            return SchemaChange.setOption(key, value);
+            schemaChanges.add(SchemaChange.setOption(key, value));
+            return schemaChanges;
         } else if (change instanceof ResetOption) {
             ResetOption resetOption = (ResetOption) change;
-            return SchemaChange.removeOption(resetOption.getKey());
+            schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
+            return schemaChanges;
         } else {
             throw new UnsupportedOperationException(
                     "Change is not supported: " + change.getClass());
@@ -362,7 +386,9 @@ public class FlinkCatalog extends AbstractCatalog {
         List<SchemaChange> changes = new ArrayList<>();
         if (null != tableChanges) {
             List<SchemaChange> schemaChanges =
-                    tableChanges.stream().map(this::toSchemaChange).collect(Collectors.toList());
+                    tableChanges.stream()
+                            .flatMap(tableChange -> toSchemaChange(tableChange).stream())
+                            .collect(Collectors.toList());
             changes.addAll(schemaChanges);
         }
 
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index cf772c7a8..207b07cfc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.testutils.assertj.AssertionUtils;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
@@ -283,7 +285,16 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                                 + "  `c` VARCHAR(2147483647),\n"
                                 + "  `d` INT,\n"
                                 + "  `e` FLOAT NOT NULL,");
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 1, CAST(NULL AS FLOAT))"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                TableException.class,
+                                "Column 'e' is NOT NULL, however, a null value is being written into it."));
 
+        // Not null -> nullable
         sql("ALTER TABLE T MODIFY e FLOAT");
         result = sql("SHOW CREATE TABLE T");
         assertThat(result.toString())
@@ -295,10 +306,40 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                                 + "  `d` INT,\n"
                                 + "  `e` FLOAT");
 
-        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c STRING NOT NULL"))
-                .hasMessageContaining(
-                        "Could not execute ALTER TABLE PAIMON.default.T\n"
-                                + "  MODIFY `c` STRING NOT NULL");
+        // Nullable -> not null
+        sql("ALTER TABLE T MODIFY c STRING NOT NULL");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `d` INT,\n"
+                                + "  `e` FLOAT");
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "INSERT INTO T VALUES('aaa', 'bbb', CAST(NULL AS STRING), 1, CAST(NULL AS FLOAT))"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                TableException.class,
+                                "Column 'c' is NOT NULL, however, a null value is being written into it."));
+
+        // Insert a null value
+        sql("INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 1, CAST(NULL AS FLOAT))");
+        result = sql("select * from T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, bbb, ccc, 1, null]]");
+
+        // Then nullable -> not null
+        tEnv.getConfig()
+                .set(
+                        ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
+                        ExecutionConfigOptions.NotNullEnforcer.DROP);
+        sql("ALTER TABLE T MODIFY e FLOAT NOT NULL;\n");
+        sql("INSERT INTO T VALUES('aa2', 'bb2', 'cc2', 2, 2.5)");
+        result = sql("select * from T");
+        assertThat(result.toString()).isEqualTo("[+I[aa2, bb2, cc2, 2, 2.5]]");
     }
 
     @Test