You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/17 02:26:54 UTC
[incubator-paimon] branch master updated: [FLINK-31459] Add UPDATE COLUMN POSITION (#603)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 14d87e4a [FLINK-31459] Add UPDATE COLUMN POSITION (#603)
14d87e4a is described below
commit 14d87e4a85e716771c59a5e2fbb840fd1852a0f6
Author: JunZhang <zh...@126.com>
AuthorDate: Fri Mar 17 10:26:49 2023 +0800
[FLINK-31459] Add UPDATE COLUMN POSITION (#603)
---
docs/content/docs/how-to/altering-tables.md | 20 ++++++++
.../table/store/file/schema/SchemaChange.java | 55 ++++++++++++++++++++
.../table/store/file/schema/SchemaManager.java | 33 ++++++++++++
.../flink/table/store/spark/SparkCatalog.java | 14 +++++
.../store/spark/SparkSchemaEvolutionITCase.java | 60 ++++++++++++++++++++++
5 files changed, 182 insertions(+)
diff --git a/docs/content/docs/how-to/altering-tables.md b/docs/content/docs/how-to/altering-tables.md
index 090860c0..7c8ade5c 100644
--- a/docs/content/docs/how-to/altering-tables.md
+++ b/docs/content/docs/how-to/altering-tables.md
@@ -202,3 +202,23 @@ ALTER TABLE my_table ALTER COLUMN buy_count COMMENT 'buy count';
{{< /tab >}}
{{< /tabs >}}
+
+
+
+## Changing Column Position
+
+To modify an existent column to a new position, use FIRST or AFTER col_name.
+
+{{< tabs "change-column-position" >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table ALTER COLUMN col_a FIRST;
+
+ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index d043da54..2d5083f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -70,6 +70,10 @@ public interface SchemaChange {
return new UpdateColumnComment(fieldNames, comment);
}
+ static SchemaChange updateColumnPosition(Move move) {
+ return new UpdateColumnPosition(move);
+ }
+
/** A SchemaChange to set a table option. */
final class SetOption implements SchemaChange {
private final String key;
@@ -291,6 +295,57 @@ public interface SchemaChange {
}
}
+ /** A SchemaChange to update the field position. */
+ final class UpdateColumnPosition implements SchemaChange {
+ private final Move move;
+
+ private UpdateColumnPosition(Move move) {
+ this.move = move;
+ }
+
+ public Move move() {
+ return move;
+ }
+ }
+
+ /** Represents a requested column move in a struct. */
+ class Move {
+ public enum MoveType {
+ FIRST,
+ AFTER
+ }
+
+ public static Move first(String fieldName) {
+ return new Move(fieldName, null, MoveType.FIRST);
+ }
+
+ public static Move after(String fieldName, String referenceFieldName) {
+ return new Move(fieldName, referenceFieldName, MoveType.AFTER);
+ }
+
+ private final String fieldName;
+ private final String referenceFieldName;
+ private final MoveType type;
+
+ public Move(String fieldName, String referenceFieldName, MoveType type) {
+ this.fieldName = fieldName;
+ this.referenceFieldName = referenceFieldName;
+ this.type = type;
+ }
+
+ public String fieldName() {
+ return fieldName;
+ }
+
+ public String referenceFieldName() {
+ return referenceFieldName;
+ }
+
+ public MoveType type() {
+ return type;
+ }
+ }
+
/** A SchemaChange to update the (nested) field nullability. */
final class UpdateColumnNullability implements SchemaChange {
private final String[] fieldNames;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index 8ca5be98..dff6ba0f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.schema.SchemaChange.RenameColumn;
import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment;
import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability;
+import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnPosition;
import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnType;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.fs.FileIO;
@@ -310,6 +311,31 @@ public class SchemaManager implements Serializable {
field.name(),
field.type(),
update.newDescription()));
+ } else if (change instanceof UpdateColumnPosition) {
+ UpdateColumnPosition update = (UpdateColumnPosition) change;
+ SchemaChange.Move move = update.move();
+
+ // key: name ; value : index
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < newFields.size(); i++) {
+ map.put(newFields.get(i).name(), i);
+ }
+
+ int fieldIndex = map.get(move.fieldName());
+ int refIndex = 0;
+ if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+ checkMoveIndexEqual(move, fieldIndex, refIndex);
+ newFields.add(refIndex, newFields.remove(fieldIndex));
+ } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+ refIndex = map.get(move.referenceFieldName());
+ checkMoveIndexEqual(move, fieldIndex, refIndex);
+ if (fieldIndex > refIndex) {
+ newFields.add(refIndex + 1, newFields.remove(fieldIndex));
+ } else {
+ newFields.add(refIndex, newFields.remove(fieldIndex));
+ }
+ }
+
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
@@ -333,6 +359,13 @@ public class SchemaManager implements Serializable {
}
}
+ private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
+ if (refIndex == fieldIndex) {
+ throw new UnsupportedOperationException(
+ String.format("Cannot move itself for column %s", move.fieldName()));
+ }
+ }
+
private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
/// TODO support partition and primary keys schema evolution
if (schema.partitionKeys().contains(fieldName)) {
diff --git a/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 86a3b343..ab76c8fb 100644
--- a/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/flink-table-store-spark-common/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn;
import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
+import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnPosition;
import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -288,6 +289,19 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
} else if (change instanceof UpdateColumnComment) {
UpdateColumnComment update = (UpdateColumnComment) change;
return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
+ } else if (change instanceof UpdateColumnPosition) {
+ UpdateColumnPosition update = (UpdateColumnPosition) change;
+ TableChange.ColumnPosition columnPosition = update.position();
+ SchemaChange.Move move = null;
+ if (columnPosition instanceof TableChange.First) {
+ move = SchemaChange.Move.first(update.fieldNames()[0]);
+ } else if (columnPosition instanceof TableChange.After) {
+ move =
+ SchemaChange.Move.after(
+ update.fieldNames()[0],
+ ((TableChange.After) columnPosition).column());
+ }
+ return SchemaChange.updateColumnPosition(move);
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
diff --git a/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 01032fb6..4b8f8a6f 100644
--- a/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ b/flink-table-store-spark/flink-table-store-spark-common/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -360,6 +360,66 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
"b"));
}
+ @Test
+ public void testUpdateColumnPosition() {
+ // move first
+ spark.sql("CREATE TABLE tableFirst (a INT , b BIGINT, c STRING)");
+ spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
+ List<Row> result = spark.sql("SHOW CREATE TABLE tableFirst").collectAsList();
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE tableFirst (\n"
+ + " `b` BIGINT,\n"
+ + " `a` INT,\n"
+ + " `c` STRING)");
+
+ // move after
+ spark.sql("CREATE TABLE tableAfter (a INT, b BIGINT, c STRING)");
+ spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
+ result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE tableAfter (\n"
+ + " `a` INT,\n"
+ + " `c` STRING,\n"
+ + " `b` BIGINT)");
+
+ spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d DOUBLE)");
+ spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
+ result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE tableAfter1 (\n"
+ + " `a` INT,\n"
+ + " `c` STRING,\n"
+ + " `b` BIGINT,\n"
+ + " `d` DOUBLE)");
+
+ // move self for first test
+ spark.sql("CREATE TABLE tableFirstSelf (a INT , b BIGINT, c STRING)");
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE tableFirstSelf ALTER COLUMN a FIRST"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot move itself for column a");
+
+ // move self for after test
+ spark.sql("CREATE TABLE tableAfterSelf (a INT , b BIGINT, c STRING)");
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE tableAfterSelf ALTER COLUMN b AFTER b"))
+ .getRootCause()
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Cannot move itself for column b");
+
+ // missing column
+ spark.sql("CREATE TABLE tableMissing (a INT , b BIGINT, c STRING)");
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST"))
+ .hasMessageContaining("Missing field d in table tablestore.default.tableMissing");
+
+ spark.sql("CREATE TABLE tableMissingAfter (a INT , b BIGINT, c STRING)");
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d"))
+ .hasMessageContaining(
+ "Missing field d in table tablestore.default.tableMissingAfter");
+ }
+
@Test
public void testAlterColumnType() {
createTable("testAlterColumnType");