You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/20 01:31:45 UTC
[incubator-paimon] branch master updated: [core][spark] Add column position for paimon (#629)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b63c6da79 [core][spark] Add column position for paimon (#629)
b63c6da79 is described below
commit b63c6da790095529384214dcb63485dee76a5351
Author: JunZhang <zh...@126.com>
AuthorDate: Mon Mar 20 09:31:41 2023 +0800
[core][spark] Add column position for paimon (#629)
---
docs/content/how-to/altering-tables.md | 18 +++++++++++
.../org/apache/paimon/schema/SchemaChange.java | 36 +++++++++++++++++++---
.../org/apache/paimon/schema/SchemaManager.java | 24 +++++++++++++--
.../apache/paimon/table/SchemaEvolutionTest.java | 5 ++-
.../java/org/apache/paimon/spark/SparkCatalog.java | 28 ++++++++++-------
.../paimon/spark/SparkSchemaEvolutionITCase.java | 26 ++++++++++++++++
6 files changed, 118 insertions(+), 19 deletions(-)
diff --git a/docs/content/how-to/altering-tables.md b/docs/content/how-to/altering-tables.md
index 7c8ade5c9..9efc9e112 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -127,6 +127,24 @@ ALTER TABLE my_table ADD COLUMNS (
{{< /tabs >}}
+## Adding Column Position
+
+To add a new column with specified position, use FIRST or AFTER col_name.
+
+{{< tabs "add-column-position" >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table ADD COLUMN c INT FIRST;
+
+ALTER TABLE my_table ADD COLUMN c INT AFTER b;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Renaming Column Name
The following SQL renames column `c0` in table `my_table` to `c1`.
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index c7be60ba9..83dceab07 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -43,11 +43,11 @@ public interface SchemaChange {
}
static SchemaChange addColumn(String fieldName, DataType dataType) {
- return addColumn(fieldName, dataType, null);
+ return addColumn(fieldName, dataType, null, null);
}
- static SchemaChange addColumn(String fieldName, DataType dataType, String comment) {
- return new AddColumn(fieldName, dataType, comment);
+ static SchemaChange addColumn(String fieldName, DataType dataType, String comment, Move move) {
+ return new AddColumn(fieldName, dataType, comment, move);
}
static SchemaChange renameColumn(String fieldName, String newName) {
@@ -145,11 +145,13 @@ public interface SchemaChange {
private final String fieldName;
private final DataType dataType;
private final String description;
+ private final Move move;
- private AddColumn(String fieldName, DataType dataType, String description) {
+ private AddColumn(String fieldName, DataType dataType, String description, Move move) {
this.fieldName = fieldName;
this.dataType = dataType;
this.description = description;
+ this.move = move;
}
public String fieldName() {
@@ -165,6 +167,11 @@ public interface SchemaChange {
return description;
}
+ @Nullable
+ public Move move() {
+ return move;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -176,13 +183,15 @@ public interface SchemaChange {
AddColumn addColumn = (AddColumn) o;
return Objects.equals(fieldName, addColumn.fieldName)
&& dataType.equals(addColumn.dataType)
- && Objects.equals(description, addColumn.description);
+ && Objects.equals(description, addColumn.description)
+ && move.equals(addColumn.move);
}
@Override
public int hashCode() {
int result = Objects.hash(dataType, description);
result = 31 * result + Objects.hashCode(fieldName);
+ result = 31 * result + Objects.hashCode(move);
return result;
}
}
@@ -306,6 +315,23 @@ public interface SchemaChange {
public Move move() {
return move;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdateColumnPosition updateColumnPosition = (UpdateColumnPosition) o;
+ return Objects.equals(move, updateColumnPosition.move);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(move);
+ }
}
/** Represents a requested column move in a struct. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 2135bed5e..9eb6d349e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -202,6 +202,7 @@ public class SchemaManager implements Serializable {
newOptions.remove(removeOption.key());
} else if (change instanceof AddColumn) {
AddColumn addColumn = (AddColumn) change;
+ SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) {
throw new IllegalArgumentException(
String.format(
@@ -214,9 +215,28 @@ public class SchemaManager implements Serializable {
int id = highestFieldId.incrementAndGet();
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
- newFields.add(
+
+ DataField dataField =
new DataField(
- id, addColumn.fieldName(), dataType, addColumn.description()));
+ id, addColumn.fieldName(), dataType, addColumn.description());
+
+ // key: name ; value : index
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < newFields.size(); i++) {
+ map.put(newFields.get(i).name(), i);
+ }
+
+ if (null != move) {
+ if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+ newFields.add(0, dataField);
+ } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+ int fieldIndex = map.get(move.referenceFieldName());
+ newFields.add(fieldIndex + 1, dataField);
+ }
+ } else {
+ newFields.add(dataField);
+ }
+
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 500c52da1..470be755c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -125,7 +125,10 @@ public class SchemaEvolutionTest {
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.addColumn(
- "f4", new IntType().copy(false), null))))
+ "f4",
+ new IntType().copy(false),
+ null,
+ null))))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("ADD COLUMN cannot specify NOT NULL.");
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 3851e1b9c..d77b71bc7 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -265,10 +265,12 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
} else if (change instanceof AddColumn) {
AddColumn add = (AddColumn) change;
validateAlterNestedField(add.fieldNames());
+ SchemaChange.Move move = getMove(add.position(), add.fieldNames());
return SchemaChange.addColumn(
add.fieldNames()[0],
toFlinkType(add.dataType()).copy(add.isNullable()),
- add.comment());
+ add.comment(),
+ move);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateAlterNestedField(rename.fieldNames());
@@ -290,16 +292,7 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
- TableChange.ColumnPosition columnPosition = update.position();
- SchemaChange.Move move = null;
- if (columnPosition instanceof TableChange.First) {
- move = SchemaChange.Move.first(update.fieldNames()[0]);
- } else if (columnPosition instanceof TableChange.After) {
- move =
- SchemaChange.Move.after(
- update.fieldNames()[0],
- ((TableChange.After) columnPosition).column());
- }
+ SchemaChange.Move move = getMove(update.position(), update.fieldNames());
return SchemaChange.updateColumnPosition(move);
} else {
throw new UnsupportedOperationException(
@@ -307,6 +300,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
}
}
+ private static SchemaChange.Move getMove(
+ TableChange.ColumnPosition columnPosition, String[] fieldNames) {
+ SchemaChange.Move move = null;
+ if (columnPosition instanceof TableChange.First) {
+ move = SchemaChange.Move.first(fieldNames[0]);
+ } else if (columnPosition instanceof TableChange.After) {
+ move =
+ SchemaChange.Move.after(
+ fieldNames[0], ((TableChange.After) columnPosition).column());
+ }
+ return move;
+ }
+
private Schema toUpdateSchema(
StructType schema, Transform[] partitions, Map<String, String> properties) {
Preconditions.checkArgument(
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index e21df7adf..bba270ba3 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -105,6 +105,32 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"java.lang.IllegalArgumentException: ADD COLUMN cannot specify NOT NULL.");
}
+ @Test
+ public void testAddColumnPosition() {
+ createTable("testAddColumnPositionFirst");
+ spark.sql("ALTER TABLE testAddColumnPositionFirst ADD COLUMN d INT FIRST");
+ List<Row> result =
+ spark.sql("SHOW CREATE TABLE testAddColumnPositionFirst").collectAsList();
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE testAddColumnPositionFirst (\n"
+ + " `d` INT,\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)");
+
+ createTable("testAddColumnPositionAfter");
+ spark.sql("ALTER TABLE testAddColumnPositionAfter ADD COLUMN d INT AFTER b");
+ result = spark.sql("SHOW CREATE TABLE testAddColumnPositionAfter").collectAsList();
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE testAddColumnPositionAfter (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `d` INT,\n"
+ + " `c` STRING)");
+ }
+
@Test
public void testRenameTable() {
// TODO: add test case for hive catalog table