You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "yuzelin (via GitHub)" <gi...@apache.org> on 2023/04/21 05:53:15 UTC

[GitHub] [incubator-paimon] yuzelin commented on a diff in pull request #965: [flink] support alter table for flink

yuzelin commented on code in PR #965:
URL: https://github.com/apache/incubator-paimon/pull/965#discussion_r1173273534


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -285,12 +351,6 @@ private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) {
             pkEquality = true;
         }
 
-        if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
-                && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
-                && pkEquality)) {
-            throw new UnsupportedOperationException("Altering schema is not supported yet.");
-        }
-

Review Comment:
   Need clean `pkEauality` related codes.
   Update: after reviewing all changes, I found that the modify constraint and watermark changes are not supported by this PR, so I think we can keep this validation now ?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -222,52 +234,106 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
         }
     }
 
+    private SchemaChange toSchemaChange(Map<String, String> oldProperties, TableChange change) {
+        if (change instanceof AddColumn) {
+            AddColumn add = (AddColumn) change;
+            String comment = add.getColumn().getComment().orElse(null);
+            SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
+            return SchemaChange.addColumn(
+                    add.getColumn().getName(),
+                    LogicalTypeConversion.toDataType(
+                            add.getColumn().getDataType().getLogicalType()),
+                    comment,
+                    move);
+        } else if (change instanceof DropColumn) {
+            DropColumn drop = (DropColumn) change;
+            return SchemaChange.dropColumn(drop.getColumnName());
+        } else if (change instanceof ModifyColumnName) {
+            ModifyColumnName modify = (ModifyColumnName) change;
+            return SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName());
+        } else if (change instanceof ModifyPhysicalColumnType) {
+            ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
+            return SchemaChange.updateColumnType(
+                    modify.getOldColumn().getName(),
+                    LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+        } else if (change instanceof ModifyColumnPosition) {
+            ModifyColumnPosition modify = (ModifyColumnPosition) change;
+            SchemaChange.Move move =
+                    getMove(modify.getNewPosition(), modify.getNewColumn().getName());
+            return SchemaChange.updateColumnPosition(move);
+        } else if (change instanceof SetOption) {
+            SetOption setOption = (SetOption) change;
+            String key = setOption.getKey();
+            String value = setOption.getValue();
+
+            if (Objects.equals(oldProperties.get(key), value)) {
+                return null;
+            }
+
+            if (PATH.key().equalsIgnoreCase(key)) {
+                throw new IllegalArgumentException("Illegal table path in table options: " + value);
+            }
+
+            return SchemaChange.setOption(key, value);
+        } else if (change instanceof ResetOption) {
+            ResetOption resetOption = (ResetOption) change;
+            return SchemaChange.removeOption(resetOption.getKey());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Change is not supported: " + change.getClass());
+        }
+    }
+
     @Override
     public void alterTable(
             ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
+        this.alterTable(tablePath, newTable, null, ignoreIfNotExists);
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath,
+            CatalogBaseTable newTable,
+            List<TableChange> tableChanges,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
         if (ignoreIfNotExists && !tableExists(tablePath)) {
             return;
         }
 
         CatalogTable table = getTable(tablePath);
 
-        // Currently, Flink SQL only support altering table properties.
         validateAlterTable(table, (CatalogTable) newTable);
 
-        List<SchemaChange> changes = new ArrayList<>();
         Map<String, String> oldProperties = table.getOptions();
-        for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-
-            if (Objects.equals(value, oldProperties.get(key))) {
-                continue;
-            }
-
-            if (PATH.key().equalsIgnoreCase(key)) {
-                throw new IllegalArgumentException("Illegal table path in table options: " + value);
-            }
-
-            changes.add(SchemaChange.setOption(key, value));
+        List<SchemaChange> changes = new ArrayList<>();
+        if (null != tableChanges) {
+            List<SchemaChange> schemaChanges =
+                    tableChanges.stream()
+                            .map(m -> toSchemaChange(oldProperties, m))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+            changes.addAll(schemaChanges);
         }
 
-        oldProperties
-                .keySet()
-                .forEach(
-                        k -> {
-                            if (!newTable.getOptions().containsKey(k)) {
-                                changes.add(SchemaChange.removeOption(k));
-                            }
-                        });
-

Review Comment:
   We cannot delete this because < 1.17 version still use the old style `alterTable` API.
   How about don't make `alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)`  call the new `alterTable`? 
   To verify that old `alter table set option` still works, we can copy old tests in `ShemaChangeITCase` to Flink 1.16 module.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -222,52 +234,106 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
         }
     }
 
+    private SchemaChange toSchemaChange(Map<String, String> oldProperties, TableChange change) {
+        if (change instanceof AddColumn) {
+            AddColumn add = (AddColumn) change;
+            String comment = add.getColumn().getComment().orElse(null);
+            SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
+            return SchemaChange.addColumn(
+                    add.getColumn().getName(),
+                    LogicalTypeConversion.toDataType(
+                            add.getColumn().getDataType().getLogicalType()),
+                    comment,
+                    move);
+        } else if (change instanceof DropColumn) {
+            DropColumn drop = (DropColumn) change;
+            return SchemaChange.dropColumn(drop.getColumnName());
+        } else if (change instanceof ModifyColumnName) {
+            ModifyColumnName modify = (ModifyColumnName) change;
+            return SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName());
+        } else if (change instanceof ModifyPhysicalColumnType) {
+            ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
+            return SchemaChange.updateColumnType(
+                    modify.getOldColumn().getName(),
+                    LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+        } else if (change instanceof ModifyColumnPosition) {
+            ModifyColumnPosition modify = (ModifyColumnPosition) change;
+            SchemaChange.Move move =
+                    getMove(modify.getNewPosition(), modify.getNewColumn().getName());
+            return SchemaChange.updateColumnPosition(move);
+        } else if (change instanceof SetOption) {
+            SetOption setOption = (SetOption) change;
+            String key = setOption.getKey();
+            String value = setOption.getValue();
+
+            if (Objects.equals(oldProperties.get(key), value)) {
+                return null;
+            }

Review Comment:
   I think modify the option to it's old value is OK, see `SparkCatalogBase#toSchemaChange`. We won't lose anything if we didn't check this. And if we don't check this, the oldProperties is unnecessary and return is Nonnull, which can keep code clean.



##########
docs/content/how-to/altering-tables.md:
##########
@@ -150,6 +169,15 @@ The following SQL renames column `c0` in table `my_table` to `c1`.
 
 {{< tabs "rename-column-name-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE T RENAME c TO c1;

Review Comment:
   The following SQL renames column `c0` in table `my_table` to `c1.



##########
docs/content/how-to/altering-tables.md:
##########
@@ -114,6 +114,15 @@ The following SQL adds two columns `c1` and `c2` to table `my_table`.
 
 {{< tabs "add-columns-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE T (a STRING, b DOUBLE, c FLOAT);
+ALTER TABLE T ADD d INT;
+```

Review Comment:
   Please not the words above: The following SQL adds two columns `c1` and `c2` to table `my_table`.
   The example should be similar to spark's example.
   Flink also support add multiple columns. See (from flink doc):
    ADD { <schema_component> | (<schema_component> [, ...]) }



##########
docs/content/how-to/altering-tables.md:
##########
@@ -165,6 +193,14 @@ The syntax is:
 
 {{< tabs "drop-columns-syntax" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE T DROP e;

Review Comment:
   Note that here is syntax, example is behind.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java:
##########
@@ -222,52 +234,106 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
         }
     }
 
+    private SchemaChange toSchemaChange(Map<String, String> oldProperties, TableChange change) {
+        if (change instanceof AddColumn) {
+            AddColumn add = (AddColumn) change;
+            String comment = add.getColumn().getComment().orElse(null);
+            SchemaChange.Move move = getMove(add.getPosition(), add.getColumn().getName());
+            return SchemaChange.addColumn(
+                    add.getColumn().getName(),
+                    LogicalTypeConversion.toDataType(
+                            add.getColumn().getDataType().getLogicalType()),
+                    comment,
+                    move);
+        } else if (change instanceof DropColumn) {
+            DropColumn drop = (DropColumn) change;
+            return SchemaChange.dropColumn(drop.getColumnName());
+        } else if (change instanceof ModifyColumnName) {
+            ModifyColumnName modify = (ModifyColumnName) change;
+            return SchemaChange.renameColumn(modify.getOldColumnName(), modify.getNewColumnName());
+        } else if (change instanceof ModifyPhysicalColumnType) {
+            ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) change;
+            return SchemaChange.updateColumnType(
+                    modify.getOldColumn().getName(),
+                    LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+        } else if (change instanceof ModifyColumnPosition) {
+            ModifyColumnPosition modify = (ModifyColumnPosition) change;
+            SchemaChange.Move move =
+                    getMove(modify.getNewPosition(), modify.getNewColumn().getName());
+            return SchemaChange.updateColumnPosition(move);
+        } else if (change instanceof SetOption) {
+            SetOption setOption = (SetOption) change;
+            String key = setOption.getKey();
+            String value = setOption.getValue();
+
+            if (Objects.equals(oldProperties.get(key), value)) {
+                return null;
+            }
+
+            if (PATH.key().equalsIgnoreCase(key)) {
+                throw new IllegalArgumentException("Illegal table path in table options: " + value);
+            }

Review Comment:
   Let `SchemaManager` to check.



##########
docs/content/how-to/altering-tables.md:
##########
@@ -195,6 +231,16 @@ The following SQL sets column `coupon_info` to be nullable.
 
 {{< tabs "change-nullability-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b FLOAT NOT NULL);
+ALTER TABLE T MODIFY b FLOAT;
+```

Review Comment:
   1. The following SQL sets column `coupon_info` to be nullable.
   2. modify nullability, not type.
   
   I think changing column type is missing, please also add a new part for `changing column type` for both spark and flink.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java:
##########
@@ -28,7 +32,211 @@
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.

Review Comment:
   Please change to `TODO cover more cases` because some changes like `AddUniqueConstraint` are still not supported in this PR.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java:
##########
@@ -28,7 +32,211 @@
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.
+    @Test
+    public void testAddColumn() {
+        sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T ADD d INT");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT\n"
+                                + ")");
+        sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null], +I[bbb, 4.5, 5.6, 5]]");
+
+        // add column with after position
+        sql("CREATE TABLE T1 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T1 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T1 ADD d INT AFTER b");
+        result = sql("SHOW CREATE TABLE T1");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T1` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `d` INT,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+        sql("INSERT INTO T1 VALUES('bbb', 4.5, 6, 5.6)");
+        result = sql("SELECT * FROM T1");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, null, 3.4], +I[bbb, 4.5, 6, 5.6]]");
+
+        // add column with first position
+        sql("CREATE TABLE T2 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T2 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T2 ADD d STRING FIRST");
+        result = sql("SHOW CREATE TABLE T2");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T2` (\n"
+                                + "  `d` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+
+        sql("INSERT INTO T2 VALUES('paimon', 'bbb', 3.4, 6.7)");
+        result = sql("SELECT * FROM T2");
+        assertThat(result.toString())
+                .isEqualTo("[+I[null, aaa, 1.2, 3.4], +I[paimon, bbb, 3.4, 6.7]]");
+    }
+
+    @Test
+    public void testDropColumn() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T DROP e");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,");
+
+        sql("ALTER TABLE T DROP (c, d)");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),");
+    }
+
+    @Test
+    public void testRenameColumn() {
+        sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc')");
+        sql("ALTER TABLE T RENAME c TO c1");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c1` VARCHAR(2147483647)");
+        result = sql("SELECT a, b, c1 FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[paimon, bbb, ccc]]");
+
+        // column do not exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME d TO d1"))
+                .hasMessageContaining("The column `d` does not exist in the base table.");
+
+        // target column exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME a TO b"))
+                .hasMessageContaining("The column `b` already existed in table schema.");
+    }
+
+    @Test
+    public void testModifyColumnType() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T MODIFY e DOUBLE");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c DOUBLE"))
+                .getRootCause()
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Column type c[STRING] cannot be converted to DOUBLE without loosing information.");

Review Comment:
   The message `Column type c[STRING] cannot be converted to DOUBLE without loosing information` is from
   `SchemaManager` Line 291. I believe there is a typo error. I think it should be `without losing information`.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java:
##########
@@ -28,7 +32,211 @@
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.
+    @Test
+    public void testAddColumn() {
+        sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T ADD d INT");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT\n"
+                                + ")");
+        sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null], +I[bbb, 4.5, 5.6, 5]]");
+
+        // add column with after position
+        sql("CREATE TABLE T1 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T1 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T1 ADD d INT AFTER b");
+        result = sql("SHOW CREATE TABLE T1");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T1` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `d` INT,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+        sql("INSERT INTO T1 VALUES('bbb', 4.5, 6, 5.6)");
+        result = sql("SELECT * FROM T1");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, null, 3.4], +I[bbb, 4.5, 6, 5.6]]");
+
+        // add column with first position
+        sql("CREATE TABLE T2 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T2 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T2 ADD d STRING FIRST");
+        result = sql("SHOW CREATE TABLE T2");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T2` (\n"
+                                + "  `d` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+
+        sql("INSERT INTO T2 VALUES('paimon', 'bbb', 3.4, 6.7)");
+        result = sql("SELECT * FROM T2");
+        assertThat(result.toString())
+                .isEqualTo("[+I[null, aaa, 1.2, 3.4], +I[paimon, bbb, 3.4, 6.7]]");
+    }
+
+    @Test
+    public void testDropColumn() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T DROP e");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,");
+
+        sql("ALTER TABLE T DROP (c, d)");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),");
+    }
+
+    @Test
+    public void testRenameColumn() {
+        sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc')");
+        sql("ALTER TABLE T RENAME c TO c1");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c1` VARCHAR(2147483647)");
+        result = sql("SELECT a, b, c1 FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[paimon, bbb, ccc]]");
+
+        // column do not exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME d TO d1"))
+                .hasMessageContaining("The column `d` does not exist in the base table.");
+
+        // target column exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME a TO b"))
+                .hasMessageContaining("The column `b` already existed in table schema.");
+    }
+
+    @Test
+    public void testModifyColumnType() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T MODIFY e DOUBLE");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");

Review Comment:
   Test: insert value before and read after.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java:
##########
@@ -28,7 +32,211 @@
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.
+    @Test
+    public void testAddColumn() {
+        sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T ADD d INT");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT\n"
+                                + ")");
+        sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null], +I[bbb, 4.5, 5.6, 5]]");
+
+        // add column with after position
+        sql("CREATE TABLE T1 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T1 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T1 ADD d INT AFTER b");
+        result = sql("SHOW CREATE TABLE T1");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T1` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `d` INT,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+        sql("INSERT INTO T1 VALUES('bbb', 4.5, 6, 5.6)");
+        result = sql("SELECT * FROM T1");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, null, 3.4], +I[bbb, 4.5, 6, 5.6]]");
+
+        // add column with first position
+        sql("CREATE TABLE T2 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T2 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T2 ADD d STRING FIRST");
+        result = sql("SHOW CREATE TABLE T2");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T2` (\n"
+                                + "  `d` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+
+        sql("INSERT INTO T2 VALUES('paimon', 'bbb', 3.4, 6.7)");
+        result = sql("SELECT * FROM T2");
+        assertThat(result.toString())
+                .isEqualTo("[+I[null, aaa, 1.2, 3.4], +I[paimon, bbb, 3.4, 6.7]]");
+    }
+
+    @Test
+    public void testDropColumn() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T DROP e");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,");
+
+        sql("ALTER TABLE T DROP (c, d)");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),");
+    }
+
+    @Test
+    public void testRenameColumn() {
+        sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc')");
+        sql("ALTER TABLE T RENAME c TO c1");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c1` VARCHAR(2147483647)");
+        result = sql("SELECT a, b, c1 FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[paimon, bbb, ccc]]");
+
+        // column do not exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME d TO d1"))
+                .hasMessageContaining("The column `d` does not exist in the base table.");
+
+        // target column exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME a TO b"))
+                .hasMessageContaining("The column `b` already existed in table schema.");
+    }
+
+    @Test
+    public void testModifyColumnType() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e FLOAT)");
+        sql("ALTER TABLE T MODIFY e DOUBLE");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c DOUBLE"))
+                .getRootCause()
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Column type c[STRING] cannot be converted to DOUBLE without loosing information.");
+    }
+
+    @Test
+    public void testModifyColumnPosition() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c STRING, d INT, e DOUBLE)");
+        sql("ALTER TABLE T MODIFY b STRING FIRST");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");
+

Review Comment:
   Test: insert values before moving and read after moving



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java:
##########
@@ -28,7 +32,211 @@
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.
+    @Test
+    public void testAddColumn() {
+        sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T ADD d INT");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT\n"
+                                + ")");
+        sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null], +I[bbb, 4.5, 5.6, 5]]");
+
+        // add column with after position
+        sql("CREATE TABLE T1 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T1 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T1 ADD d INT AFTER b");
+        result = sql("SHOW CREATE TABLE T1");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T1` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `d` INT,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+        sql("INSERT INTO T1 VALUES('bbb', 4.5, 6, 5.6)");
+        result = sql("SELECT * FROM T1");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, null, 3.4], +I[bbb, 4.5, 6, 5.6]]");
+
+        // add column with first position
+        sql("CREATE TABLE T2 (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T2 VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T2 ADD d STRING FIRST");
+        result = sql("SHOW CREATE TABLE T2");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T2` (\n"
+                                + "  `d` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT\n"
+                                + ")");
+
+        sql("INSERT INTO T2 VALUES('paimon', 'bbb', 3.4, 6.7)");
+        result = sql("SELECT * FROM T2");
+        assertThat(result.toString())
+                .isEqualTo("[+I[null, aaa, 1.2, 3.4], +I[paimon, bbb, 3.4, 6.7]]");
+    }
+
+    @Test
+    public void testDropColumn() {

Review Comment:
   1. Test: insert value before and read after.
   2. Please add test for drop partition key and primary key (should throw exception). See `SparkSchemaEvolutionITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org