You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/20 01:31:45 UTC

[incubator-paimon] branch master updated: [core][spark] Add column position for paimon (#629)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b63c6da79 [core][spark] Add column position for paimon (#629)
b63c6da79 is described below

commit b63c6da790095529384214dcb63485dee76a5351
Author: JunZhang <zh...@126.com>
AuthorDate: Mon Mar 20 09:31:41 2023 +0800

    [core][spark] Add column position for paimon (#629)
---
 docs/content/how-to/altering-tables.md             | 18 +++++++++++
 .../org/apache/paimon/schema/SchemaChange.java     | 36 +++++++++++++++++++---
 .../org/apache/paimon/schema/SchemaManager.java    | 24 +++++++++++++--
 .../apache/paimon/table/SchemaEvolutionTest.java   |  5 ++-
 .../java/org/apache/paimon/spark/SparkCatalog.java | 28 ++++++++++-------
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 26 ++++++++++++++++
 6 files changed, 118 insertions(+), 19 deletions(-)

diff --git a/docs/content/how-to/altering-tables.md b/docs/content/how-to/altering-tables.md
index 7c8ade5c9..9efc9e112 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -127,6 +127,24 @@ ALTER TABLE my_table ADD COLUMNS (
 
 {{< /tabs >}}
 
+## Adding Column Position
+
+To add a new column with specified position, use FIRST or AFTER col_name.
+
+{{< tabs "add-column-position" >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table ADD COLUMN c INT FIRST;
+
+ALTER TABLE my_table ADD COLUMN c INT AFTER b;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ## Renaming Column Name
 The following SQL renames column `c0` in table `my_table` to `c1`.
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index c7be60ba9..83dceab07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -43,11 +43,11 @@ public interface SchemaChange {
     }
 
     static SchemaChange addColumn(String fieldName, DataType dataType) {
-        return addColumn(fieldName, dataType, null);
+        return addColumn(fieldName, dataType, null, null);
     }
 
-    static SchemaChange addColumn(String fieldName, DataType dataType, String comment) {
-        return new AddColumn(fieldName, dataType, comment);
+    static SchemaChange addColumn(String fieldName, DataType dataType, String comment, Move move) {
+        return new AddColumn(fieldName, dataType, comment, move);
     }
 
     static SchemaChange renameColumn(String fieldName, String newName) {
@@ -145,11 +145,13 @@ public interface SchemaChange {
         private final String fieldName;
         private final DataType dataType;
         private final String description;
+        private final Move move;
 
-        private AddColumn(String fieldName, DataType dataType, String description) {
+        private AddColumn(String fieldName, DataType dataType, String description, Move move) {
             this.fieldName = fieldName;
             this.dataType = dataType;
             this.description = description;
+            this.move = move;
         }
 
         public String fieldName() {
@@ -165,6 +167,11 @@ public interface SchemaChange {
             return description;
         }
 
+        @Nullable
+        public Move move() {
+            return move;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -176,13 +183,15 @@ public interface SchemaChange {
             AddColumn addColumn = (AddColumn) o;
             return Objects.equals(fieldName, addColumn.fieldName)
                     && dataType.equals(addColumn.dataType)
-                    && Objects.equals(description, addColumn.description);
+                    && Objects.equals(description, addColumn.description)
+                    && move.equals(addColumn.move);
         }
 
         @Override
         public int hashCode() {
             int result = Objects.hash(dataType, description);
             result = 31 * result + Objects.hashCode(fieldName);
+            result = 31 * result + Objects.hashCode(move);
             return result;
         }
     }
@@ -306,6 +315,23 @@ public interface SchemaChange {
         public Move move() {
             return move;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition) o;
+            return Objects.equals(move, updateColumnPosition.move);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(move);
+        }
     }
 
     /** Represents a requested column move in a struct. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 2135bed5e..9eb6d349e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -202,6 +202,7 @@ public class SchemaManager implements Serializable {
                     newOptions.remove(removeOption.key());
                 } else if (change instanceof AddColumn) {
                     AddColumn addColumn = (AddColumn) change;
+                    SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) {
                         throw new IllegalArgumentException(
                                 String.format(
@@ -214,9 +215,28 @@ public class SchemaManager implements Serializable {
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
-                    newFields.add(
+
+                    DataField dataField =
                             new DataField(
-                                    id, addColumn.fieldName(), dataType, addColumn.description()));
+                                    id, addColumn.fieldName(), dataType, addColumn.description());
+
+                    // key: name ; value : index
+                    Map<String, Integer> map = new HashMap<>();
+                    for (int i = 0; i < newFields.size(); i++) {
+                        map.put(newFields.get(i).name(), i);
+                    }
+
+                    if (null != move) {
+                        if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                            newFields.add(0, dataField);
+                        } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                            int fieldIndex = map.get(move.referenceFieldName());
+                            newFields.add(fieldIndex + 1, dataField);
+                        }
+                    } else {
+                        newFields.add(dataField);
+                    }
+
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
                     validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 500c52da1..470be755c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -125,7 +125,10 @@ public class SchemaEvolutionTest {
                                 schemaManager.commitChanges(
                                         Collections.singletonList(
                                                 SchemaChange.addColumn(
-                                                        "f4", new IntType().copy(false), null))))
+                                                        "f4",
+                                                        new IntType().copy(false),
+                                                        null,
+                                                        null))))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage("ADD COLUMN cannot specify NOT NULL.");
     }
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 3851e1b9c..d77b71bc7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -265,10 +265,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         } else if (change instanceof AddColumn) {
             AddColumn add = (AddColumn) change;
             validateAlterNestedField(add.fieldNames());
+            SchemaChange.Move move = getMove(add.position(), add.fieldNames());
             return SchemaChange.addColumn(
                     add.fieldNames()[0],
                     toFlinkType(add.dataType()).copy(add.isNullable()),
-                    add.comment());
+                    add.comment(),
+                    move);
         } else if (change instanceof RenameColumn) {
             RenameColumn rename = (RenameColumn) change;
             validateAlterNestedField(rename.fieldNames());
@@ -290,16 +292,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
             return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
         } else if (change instanceof UpdateColumnPosition) {
             UpdateColumnPosition update = (UpdateColumnPosition) change;
-            TableChange.ColumnPosition columnPosition = update.position();
-            SchemaChange.Move move = null;
-            if (columnPosition instanceof TableChange.First) {
-                move = SchemaChange.Move.first(update.fieldNames()[0]);
-            } else if (columnPosition instanceof TableChange.After) {
-                move =
-                        SchemaChange.Move.after(
-                                update.fieldNames()[0],
-                                ((TableChange.After) columnPosition).column());
-            }
+            SchemaChange.Move move = getMove(update.position(), update.fieldNames());
             return SchemaChange.updateColumnPosition(move);
         } else {
             throw new UnsupportedOperationException(
@@ -307,6 +300,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
         }
     }
 
+    private static SchemaChange.Move getMove(
+            TableChange.ColumnPosition columnPosition, String[] fieldNames) {
+        SchemaChange.Move move = null;
+        if (columnPosition instanceof TableChange.First) {
+            move = SchemaChange.Move.first(fieldNames[0]);
+        } else if (columnPosition instanceof TableChange.After) {
+            move =
+                    SchemaChange.Move.after(
+                            fieldNames[0], ((TableChange.After) columnPosition).column());
+        }
+        return move;
+    }
+
     private Schema toUpdateSchema(
             StructType schema, Transform[] partitions, Map<String, String> properties) {
         Preconditions.checkArgument(
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index e21df7adf..bba270ba3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -105,6 +105,32 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
                         "java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
     }
 
+    @Test
+    public void testAddColumnPosition() {
+        createTable("testAddColumnPositionFirst");
+        spark.sql("ALTER TABLE testAddColumnPositionFirst ADD COLUMN d INT FIRST");
+        List<Row> result =
+                spark.sql("SHOW CREATE TABLE testAddColumnPositionFirst").collectAsList();
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE testAddColumnPositionFirst (\n"
+                                + "  `d` INT,\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)");
+
+        createTable("testAddColumnPositionAfter");
+        spark.sql("ALTER TABLE testAddColumnPositionAfter ADD COLUMN d INT AFTER b");
+        result = spark.sql("SHOW CREATE TABLE testAddColumnPositionAfter").collectAsList();
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE testAddColumnPositionAfter (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `d` INT,\n"
+                                + "  `c` STRING)");
+    }
+
     @Test
     public void testRenameTable() {
         // TODO: add test case for hive catalog table