You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/17 02:26:54 UTC

[incubator-paimon] branch master updated: [FLINK-31459] Add UPDATE COLUMN POSITION (#603)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 14d87e4a [FLINK-31459] Add UPDATE COLUMN POSITION (#603)
14d87e4a is described below

commit 14d87e4a85e716771c59a5e2fbb840fd1852a0f6
Author: JunZhang <zh...@126.com>
AuthorDate: Fri Mar 17 10:26:49 2023 +0800

    [FLINK-31459] Add UPDATE COLUMN POSITION (#603)
---
 docs/content/docs/how-to/altering-tables.md        | 20 ++++++++
 .../table/store/file/schema/SchemaChange.java      | 55 ++++++++++++++++++++
 .../table/store/file/schema/SchemaManager.java     | 33 ++++++++++++
 .../flink/table/store/spark/SparkCatalog.java      | 14 +++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 60 ++++++++++++++++++++++
 5 files changed, 182 insertions(+)

diff --git a/docs/content/docs/how-to/altering-tables.md b/docs/content/docs/how-to/altering-tables.md
index 090860c0..7c8ade5c 100644
--- a/docs/content/docs/how-to/altering-tables.md
+++ b/docs/content/docs/how-to/altering-tables.md
@@ -202,3 +202,23 @@ ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';
 {{< /tab >}}
 
 {{< /tabs >}}
+
+
+
+## Changing Column Position
+
+To modify an existent column to a new position, use FIRST or AFTER col_name.
+
+{{< tabs "change-column-position" >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table ALTER COLUMN col_a FIRST;
+
+ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index d043da54..2d5083f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -70,6 +70,10 @@ public interface SchemaChange {
         return new UpdateColumnComment(fieldNames, comment);
     }
 
+    static SchemaChange updateColumnPosition(Move move) {
+        return new UpdateColumnPosition(move);
+    }
+
     /** A SchemaChange to set a table option. */
     final class SetOption implements SchemaChange {
         private final String key;
@@ -291,6 +295,57 @@ public interface SchemaChange {
         }
     }
 
+    /** A SchemaChange to update the field position. */
+    final class UpdateColumnPosition implements SchemaChange {
+        private final Move move;
+
+        private UpdateColumnPosition(Move move) {
+            this.move = move;
+        }
+
+        public Move move() {
+            return move;
+        }
+    }
+
+    /** Represents a requested column move in a struct. */
+    class Move {
+        public enum MoveType {
+            FIRST,
+            AFTER
+        }
+
+        public static Move first(String fieldName) {
+            return new Move(fieldName, null, MoveType.FIRST);
+        }
+
+        public static Move after(String fieldName, String referenceFieldName) {
+            return new Move(fieldName, referenceFieldName, MoveType.AFTER);
+        }
+
+        private final String fieldName;
+        private final String referenceFieldName;
+        private final MoveType type;
+
+        public Move(String fieldName, String referenceFieldName, MoveType type) {
+            this.fieldName = fieldName;
+            this.referenceFieldName = referenceFieldName;
+            this.type = type;
+        }
+
+        public String fieldName() {
+            return fieldName;
+        }
+
+        public String referenceFieldName() {
+            return referenceFieldName;
+        }
+
+        public MoveType type() {
+            return type;
+        }
+    }
+
     /** A SchemaChange to update the (nested) field nullability. */
     final class UpdateColumnNullability implements SchemaChange {
         private final String[] fieldNames;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 8ca5be98..dff6ba0f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.schema.SchemaChange.RenameColumn;
 import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
 import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment;
 import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.fs.FileIO;
@@ -310,6 +311,31 @@ public class SchemaManager implements Serializable {
                                             field.name(),
                                             field.type(),
                                             update.newDescription()));
+                } else if (change instanceof UpdateColumnPosition) {
+                    UpdateColumnPosition update = (UpdateColumnPosition) change;
+                    SchemaChange.Move move = update.move();
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    int fieldIndex = map.get(move.fieldName());
+                    int refIndex = 0;
+                    if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        newFields.add(refIndex, newFields.remove(fieldIndex));
+                    } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                        refIndex = map.get(move.referenceFieldName());
+                        checkMoveIndexEqual(move, fieldIndex, refIndex);
+                        if (fieldIndex > refIndex) {
+                            newFields.add(refIndex + 1, newFields.remove(fieldIndex));
+                        } else {
+                            newFields.add(refIndex, newFields.remove(fieldIndex));
+                        }
+                    }
+
                 } else {
                     throw new UnsupportedOperationException(
                             "Unsupported change: " + change.getClass());
@@ -333,6 +359,13 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
+        if (refIndex == fieldIndex) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot move itself for column %s", move.fieldName()));
+        }
+    }
+
     private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
         /// TODO support partition and primary keys schema evolution
         if (schema.partitionKeys().contains(fieldName)) {
diff --git a/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 86a3b343..ab76c8fb 100644
--- a/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn;
 import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnPosition;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
 import org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -288,6 +289,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         } else if (change instanceof UpdateColumnComment) {
             UpdateColumnComment update = (UpdateColumnComment) change;
             return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
+        } else if (change instanceof UpdateColumnPosition) {
+            UpdateColumnPosition update = (UpdateColumnPosition) change;
+            TableChange.ColumnPosition columnPosition = update.position();
+            SchemaChange.Move move = null;
+            if (columnPosition instanceof TableChange.First) {
+                move = SchemaChange.Move.first(update.fieldNames()[0]);
+            } else if (columnPosition instanceof TableChange.After) {
+                move =
+                        SchemaChange.Move.after(
+                                update.fieldNames()[0],
+                                ((TableChange.After) columnPosition).column());
+            }
+            return SchemaChange.updateColumnPosition(move);
         } else {
             throw new UnsupportedOperationException(
                     "Change is not supported: " + change.getClass());
diff --git a/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 01032fb6..4b8f8a6f 100644
--- a/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -360,6 +360,66 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                                 "b"));
     }
 
+    @Test
+    public void testUpdateColumnPosition() {
+        // move first
+        spark.sql("CREATE TABLE tableFirst (a INT , b BIGINT, c STRING)");
+        spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
+        List<Row> result = spark.sql("SHOW CREATE TABLE tableFirst").collectAsList();
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE tableFirst (\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `a` INT,\n"
+                                + "  `c` STRING)");
+
+        // move after
+        spark.sql("CREATE TABLE tableAfter (a INT, b BIGINT, c STRING)");
+        spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
+        result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE tableAfter (\n"
+                                + "  `a` INT,\n"
+                                + "  `c` STRING,\n"
+                                + "  `b` BIGINT)");
+
+        spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d DOUBLE)");
+        spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
+        result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE tableAfter1 (\n"
+                                + "  `a` INT,\n"
+                                + "  `c` STRING,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `d` DOUBLE)");
+
+        //  move self for first test
+        spark.sql("CREATE TABLE tableFirstSelf (a INT , b BIGINT, c STRING)");
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER COLUMN a FIRST"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Cannot move itself for column a");
+
+        //  move self for after test
+        spark.sql("CREATE TABLE tableAfterSelf (a INT , b BIGINT, c STRING)");
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER COLUMN b AFTER b"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Cannot move itself for column b");
+
+        // missing column
+        spark.sql("CREATE TABLE tableMissing (a INT , b BIGINT, c STRING)");
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST"))
+                .hasMessageContaining("Missing field d in table tablestore.default.tableMissing");
+
+        spark.sql("CREATE TABLE tableMissingAfter (a INT , b BIGINT, c STRING)");
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d"))
+                .hasMessageContaining(
+                        "Missing field d in table tablestore.default.tableMissingAfter");
+    }
+
     @Test
     public void testAlterColumnType() {
         createTable("testAlterColumnType");