You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/03 12:14:31 UTC
[flink] branch master updated: [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d5b8ad7672c [FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
d5b8ad7672c is described below
commit d5b8ad7672cd981cdbc757549ed2ab1fcf007500
Author: Shengkai <10...@qq.com>
AuthorDate: Fri Dec 30 16:15:02 2022 +0800
[FLINK-30496][table-api] Introduce TableChange to represent MODIFY change
This closes #21577
---
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 39 +-
.../sql/parser/ddl/SqlAlterTableRenameColumn.java | 2 +-
.../operations/ddl/AlterTableChangeOperation.java | 43 ++
.../apache/flink/table/catalog/TableChange.java | 496 ++++++++++++++++++++-
.../planner/operations/AlterSchemaConverter.java | 279 +++++++-----
.../operations/SqlToOperationConverter.java | 20 +-
.../planner/utils/OperationConverterUtils.java | 85 +++-
.../operations/SqlToOperationConverterTest.java | 106 +++--
8 files changed, 891 insertions(+), 179 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index b386a46b62b..79cef5614be 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -42,11 +42,14 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ContextResolvedTable;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -439,7 +442,7 @@ public class HiveParserDDLSemanticAnalyzer {
if (partSpecNode != null) {
partSpec = getPartSpec(partSpecNode);
}
- CatalogBaseTable alteredTable = getAlteredTable(tableName, false);
+ ResolvedCatalogBaseTable<?> alteredTable = getAlteredTable(tableName, false);
switch (ast.getType()) {
case HiveASTParser.TOK_ALTERTABLE_RENAME:
operation = convertAlterTableRename(tableName, ast, false);
@@ -1880,7 +1883,7 @@ public class HiveParserDDLSemanticAnalyzer {
}
private Operation convertAlterTableChangeCol(
- CatalogBaseTable alteredTable, String[] qualified, HiveParserASTNode ast)
+ ResolvedCatalogBaseTable<?> alteredTable, String[] qualified, HiveParserASTNode ast)
throws SemanticException {
String newComment = null;
boolean first = false;
@@ -1933,7 +1936,7 @@ public class HiveParserDDLSemanticAnalyzer {
String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified);
ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
- CatalogTable oldTable = (CatalogTable) alteredTable;
+ ResolvedCatalogTable oldTable = (ResolvedCatalogTable) alteredTable;
String oldName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName);
String newName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName);
@@ -1954,8 +1957,27 @@ public class HiveParserDDLSemanticAnalyzer {
if (isCascade) {
props.put(ALTER_COL_CASCADE, "true");
}
- return new AlterTableSchemaOperation(
+
+ List<TableChange> tableChanges =
+ OperationConverterUtils.buildModifyColumnChange(
+ oldTable.getResolvedSchema()
+ .getColumn(oldName)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ "Can not find the old column: "
+ + oldColName)),
+ Column.physical(newTableColumn.getName(), newTableColumn.getType())
+ .withComment(newComment),
+ first
+ ? TableChange.ColumnPosition.first()
+ : (flagCol == null
+ ? null
+ : TableChange.ColumnPosition.after(flagCol)));
+
+ return new AlterTableChangeOperation(
tableIdentifier,
+ tableChanges,
new CatalogTableImpl(
newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()));
}
@@ -1984,6 +2006,7 @@ public class HiveParserDDLSemanticAnalyzer {
final int numPartCol = oldTable.getPartitionKeys().size();
TableSchema.Builder builder = TableSchema.builder();
// add existing non-part col if we're not replacing
+ // TODO: support TableChange in FLINK-30497
if (!replace) {
List<TableColumn> nonPartCols =
oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
@@ -2146,9 +2169,9 @@ public class HiveParserDDLSemanticAnalyzer {
return new AlterViewPropertiesOperation(viewIdentifier, newView);
}
- private CatalogBaseTable getAlteredTable(String tableName, boolean expectView) {
+ private ResolvedCatalogBaseTable<?> getAlteredTable(String tableName, boolean expectView) {
ObjectIdentifier objectIdentifier = parseObjectIdentifier(tableName);
- CatalogBaseTable catalogBaseTable = getCatalogBaseTable(objectIdentifier);
+ ResolvedCatalogBaseTable<?> catalogBaseTable = getCatalogBaseTable(objectIdentifier);
if (expectView) {
if (catalogBaseTable instanceof CatalogTable) {
throw new ValidationException("ALTER VIEW for a table is not allowed");
@@ -2177,11 +2200,11 @@ public class HiveParserDDLSemanticAnalyzer {
return database;
}
- private CatalogBaseTable getCatalogBaseTable(ObjectIdentifier tableIdentifier) {
+ private ResolvedCatalogBaseTable<?> getCatalogBaseTable(ObjectIdentifier tableIdentifier) {
return getCatalogBaseTable(tableIdentifier, false);
}
- private CatalogBaseTable getCatalogBaseTable(
+ private ResolvedCatalogBaseTable<?> getCatalogBaseTable(
ObjectIdentifier tableIdentifier, boolean ifExists) {
Optional<ContextResolvedTable> optionalCatalogTable =
catalogManager.getTable(tableIdentifier);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index 28196183ff7..72aa3c1a067 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -50,7 +50,7 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
tableIdentifier, originColumnIdentifier, newColumnIdentifier);
}
- public SqlIdentifier getOriginColumnIdentifier() {
+ public SqlIdentifier getOldColumnIdentifier() {
return originColumnIdentifier;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index fd7a89ee933..e8ef9519de1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.operations.ddl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.utils.EncodingUtils;
import java.util.Collections;
import java.util.List;
@@ -76,6 +77,48 @@ public class AlterTableChangeOperation extends AlterTableOperation {
TableChange.AddUniqueConstraint addUniqueConstraint =
(TableChange.AddUniqueConstraint) tableChange;
return String.format(" ADD %s", addUniqueConstraint.getConstraint());
+ } else if (tableChange instanceof TableChange.ModifyColumnComment) {
+ TableChange.ModifyColumnComment modifyColumnComment =
+ (TableChange.ModifyColumnComment) tableChange;
+ return String.format(
+ " MODIFY %s COMMENT '%s'",
+ EncodingUtils.escapeIdentifier(modifyColumnComment.getNewColumn().getName()),
+ modifyColumnComment.getNewComment());
+ } else if (tableChange instanceof TableChange.ModifyPhysicalColumnType) {
+ TableChange.ModifyPhysicalColumnType modifyPhysicalColumnType =
+ (TableChange.ModifyPhysicalColumnType) tableChange;
+ return String.format(
+ " MODIFY %s %s",
+ EncodingUtils.escapeIdentifier(
+ modifyPhysicalColumnType.getNewColumn().getName()),
+ modifyPhysicalColumnType.getNewType());
+ } else if (tableChange instanceof TableChange.ModifyColumnPosition) {
+ TableChange.ModifyColumnPosition modifyColumnPosition =
+ (TableChange.ModifyColumnPosition) tableChange;
+ return String.format(
+ " MODIFY %s %s",
+ EncodingUtils.escapeIdentifier(modifyColumnPosition.getNewColumn().getName()),
+ modifyColumnPosition.getNewPosition());
+ } else if (tableChange instanceof TableChange.ModifyColumnName) {
+ TableChange.ModifyColumnName modifyColumnName =
+ (TableChange.ModifyColumnName) tableChange;
+ return String.format(
+ " MODIFY %s TO %s",
+ EncodingUtils.escapeIdentifier(modifyColumnName.getOldColumnName()),
+ EncodingUtils.escapeIdentifier(modifyColumnName.getNewColumnName()));
+ } else if (tableChange instanceof TableChange.ModifyColumn) {
+ TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) tableChange;
+ return String.format(
+ " MODIFY %s %s",
+ modifyColumn.getNewColumn(),
+ modifyColumn.getNewPosition() == null ? "" : modifyColumn.getNewPosition());
+ } else if (tableChange instanceof TableChange.ModifyWatermark) {
+ TableChange.ModifyWatermark modifyWatermark = (TableChange.ModifyWatermark) tableChange;
+ return String.format(" MODIFY %s", modifyWatermark.getNewWatermark());
+ } else if (tableChange instanceof TableChange.ModifyUniqueConstraint) {
+ TableChange.ModifyUniqueConstraint modifyUniqueConstraint =
+ (TableChange.ModifyUniqueConstraint) tableChange;
+ return String.format(" MODIFY %s", modifyUniqueConstraint.getNewConstraint());
} else {
throw new UnsupportedOperationException(
String.format("Unknown table change: %s.", tableChange));
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 1a2b40d9134..240148836c8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -19,7 +19,9 @@
package org.apache.flink.table.catalog;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
@@ -51,7 +53,7 @@ public interface TableChange {
* <p>It is equal to the following statement:
*
* <pre>
- * ALTER TABLE <table_name> ADD <column_definition> <position>
+ * ALTER TABLE <table_name> ADD <column_definition> <column_position>
* </pre>
*
* @param column the added column definition.
@@ -68,7 +70,7 @@ public interface TableChange {
* <p>It is equal to the following statement:
*
* <pre>
- * ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>...) NOT ENFORCED;
+ * ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>...) NOT ENFORCED
* </pre>
*
* @param constraint the added constraint definition.
@@ -94,6 +96,139 @@ public interface TableChange {
return new AddWatermark(watermarkSpec);
}
+ /**
+ * A table change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>rename column name
+ * <li>change the computed expression
+ * <li>change the metadata column expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are represented by the {@link
+ * TableChange#modifyPhysicalColumnType}, {@link TableChange#modifyColumnName}, {@link
+ * TableChange#modifyColumnComment} and {@link TableChange#modifyColumnPosition}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition> COMMENT '<column_comment>' <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newColumn the definition of the new column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumn modify(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition columnPosition) {
+ return new ModifyColumn(oldColumn, newColumn, columnPosition);
+ }
+
+ /**
+ * A table change that modify the physical column data type.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <new_column_type>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newType the type of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyPhysicalColumnType modifyPhysicalColumnType(Column oldColumn, DataType newType) {
+ return new ModifyPhysicalColumnType(oldColumn, newType);
+ }
+
+ /**
+ * A table change to modify the column name.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RENAME <old_column_name> TO <new_column_name>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newName the name of the new column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnName modifyColumnName(Column oldColumn, String newName) {
+ return new ModifyColumnName(oldColumn, newName);
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param newComment the modified comment.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnComment modifyColumnComment(Column oldColumn, String newComment) {
+ return new ModifyColumnComment(oldColumn, newComment);
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <original_column_type> <column_position>
+ * </pre>
+ *
+ * @param oldColumn the definition of the old column.
+ * @param columnPosition the new position of the column.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyColumnPosition modifyColumnPosition(
+ Column oldColumn, ColumnPosition columnPosition) {
+ return new ModifyColumnPosition(oldColumn, columnPosition);
+ }
+
+ /**
+ * A table change to modify a unique constraint.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY PRIMARY KEY (<column_name>...) NOT ENFORCED;
+ * </pre>
+ *
+ * @param newConstraint the modified constraint definition.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyUniqueConstraint modify(UniqueConstraint newConstraint) {
+ return new ModifyUniqueConstraint(newConstraint);
+ }
+
+ /**
+ * A table change to modify a watermark.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY WATERMARK FOR <row_time> AS <row_time_expression>
+ * </pre>
+ *
+ * @param newWatermarkSpec the modified watermark definition.
+ * @return a TableChange represents the modification.
+ */
+ static ModifyWatermark modify(WatermarkSpec newWatermarkSpec) {
+ return new ModifyWatermark(newWatermarkSpec);
+ }
+
/**
* A table change to set the table option.
*
@@ -137,7 +272,7 @@ public interface TableChange {
* <p>It is equal to the following statement:
*
* <pre>
- * ALTER TABLE <table_name> ADD <column_definition> <position>
+ * ALTER TABLE <table_name> ADD <column_definition> <column_position>
* </pre>
*/
@PublicEvolving
@@ -276,6 +411,361 @@ public interface TableChange {
}
}
+ // --------------------------------------------------------------------------------------------
+ // Modify Change
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A base schema change to modify a column. The modification includes:
+ *
+ * <ul>
+ * <li>change column data type
+ * <li>reorder column position
+ * <li>modify column comment
+ * <li>rename column name
+ * <li>change the computed expression
+ * <li>change the metadata column expression
+ * </ul>
+ *
+ * <p>Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnType},
+ * {@link ModifyColumnComment}, {@link ModifyColumnPosition} and {@link ModifyColumnName}.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_definition> COMMENT '<column_comment>' <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumn implements TableChange {
+
+ protected final Column oldColumn;
+ protected final Column newColumn;
+
+ protected final @Nullable ColumnPosition newPosition;
+
+ public ModifyColumn(
+ Column oldColumn, Column newColumn, @Nullable ColumnPosition newPosition) {
+ this.oldColumn = oldColumn;
+ this.newColumn = newColumn;
+ this.newPosition = newPosition;
+ }
+
+ /** Returns the original {@link Column} instance. */
+ public Column getOldColumn() {
+ return oldColumn;
+ }
+
+ /** Returns the modified {@link Column} instance. */
+ public Column getNewColumn() {
+ return newColumn;
+ }
+
+ /**
+ * Returns the position of the modified {@link Column} instance. When the return value is
+ * null, it means modify the column at the original position. When the return value is
+ * FIRST, it means move the modified column to the first. When the return value is AFTER, it
+ * means move the column after the referred column.
+ */
+ public @Nullable ColumnPosition getNewPosition() {
+ return newPosition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ModifyColumn)) {
+ return false;
+ }
+ ModifyColumn that = (ModifyColumn) o;
+ return Objects.equals(oldColumn, that.oldColumn)
+ && Objects.equals(newColumn, that.newColumn)
+ && Objects.equals(newPosition, that.newPosition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(oldColumn, newColumn, newPosition);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumn{"
+ + "oldColumn="
+ + oldColumn
+ + ", newColumn="
+ + newColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <original_column_type> COMMENT '<new_column_comment>'
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnComment extends ModifyColumn {
+
+ private final String newComment;
+
+ private ModifyColumnComment(Column oldColumn, String newComment) {
+ super(oldColumn, oldColumn.withComment(newComment), null);
+ this.newComment = newComment;
+ }
+
+ /** Get the new comment for the column. */
+ public String getNewComment() {
+ return newComment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnComment) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnComment{"
+ + "Column="
+ + oldColumn
+ + ", newComment='"
+ + newComment
+ + '\''
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column position.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <original_column_type> <column_position>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnPosition extends ModifyColumn {
+
+ public ModifyColumnPosition(Column oldColumn, ColumnPosition newPosition) {
+ super(oldColumn, oldColumn, newPosition);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnPosition) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnPosition{"
+ + "Column="
+ + oldColumn
+ + ", newPosition="
+ + newPosition
+ + '}';
+ }
+ }
+
+ /**
+ * A table change that modify the physical column data type.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY <column_name> <new_column_type>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyPhysicalColumnType extends ModifyColumn {
+
+ private ModifyPhysicalColumnType(Column oldColumn, DataType newType) {
+ super(oldColumn, oldColumn.copy(newType), null);
+ Preconditions.checkArgument(oldColumn.isPhysical());
+ }
+
+ /** Get the column type for the new column. */
+ public DataType getNewType() {
+ return newColumn.getDataType();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyPhysicalColumnType) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyPhysicalColumnType{"
+ + "Column="
+ + oldColumn
+ + ", newType="
+ + getNewType()
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the column name.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RENAME <old_column_name> TO <new_column_name>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyColumnName extends ModifyColumn {
+
+ private ModifyColumnName(Column oldColumn, String newName) {
+ super(oldColumn, createNewColumn(oldColumn, newName), null);
+ }
+
+ private static Column createNewColumn(Column oldColumn, String newName) {
+ if (oldColumn instanceof Column.PhysicalColumn) {
+ return Column.physical(newName, oldColumn.getDataType())
+ .withComment(oldColumn.comment);
+ } else if (oldColumn instanceof Column.MetadataColumn) {
+ Column.MetadataColumn metadataColumn = (Column.MetadataColumn) oldColumn;
+ return Column.metadata(
+ newName,
+ oldColumn.getDataType(),
+ metadataColumn.getMetadataKey().orElse(null),
+ metadataColumn.isVirtual())
+ .withComment(oldColumn.comment);
+ } else {
+ return Column.computed(newName, ((Column.ComputedColumn) oldColumn).getExpression())
+ .withComment(oldColumn.comment);
+ }
+ }
+
+ /** Returns the origin column name. */
+ public String getOldColumnName() {
+ return oldColumn.getName();
+ }
+
+ /** Returns the new column name after renaming the column name. */
+ public String getNewColumnName() {
+ return newColumn.getName();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ModifyColumnName) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyColumnName{"
+ + "Column="
+ + oldColumn
+ + ", newName="
+ + getNewColumnName()
+ + '}';
+ }
+ }
+
+ /**
+ * A table change to modify a unique constraint.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY PRIMARY KEY (<column_name> ...) NOT ENFORCED
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyUniqueConstraint implements TableChange {
+
+ private final UniqueConstraint newConstraint;
+
+ public ModifyUniqueConstraint(UniqueConstraint newConstraint) {
+ this.newConstraint = newConstraint;
+ }
+
+ /** Returns the modified unique constraint. */
+ public UniqueConstraint getNewConstraint() {
+ return newConstraint;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ModifyUniqueConstraint)) {
+ return false;
+ }
+ ModifyUniqueConstraint that = (ModifyUniqueConstraint) o;
+ return Objects.equals(newConstraint, that.newConstraint);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newConstraint);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyUniqueConstraint{" + "newConstraint=" + newConstraint + '}';
+ }
+ }
+
+ /**
+ * A table change to modify the watermark.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> MODIFY WATERMARK FOR <row_time_column_name> AS <watermark_expression>
+ * </pre>
+ */
+ @PublicEvolving
+ class ModifyWatermark implements TableChange {
+
+ private final WatermarkSpec newWatermark;
+
+ public ModifyWatermark(WatermarkSpec newWatermark) {
+ this.newWatermark = newWatermark;
+ }
+
+ /** Returns the modified watermark. */
+ public WatermarkSpec getNewWatermark() {
+ return newWatermark;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ModifyWatermark)) {
+ return false;
+ }
+ ModifyWatermark that = (ModifyWatermark) o;
+ return Objects.equals(newWatermark, that.newWatermark);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newWatermark);
+ }
+
+ @Override
+ public String toString() {
+ return "ModifyWatermark{" + "newWatermark=" + newWatermark + '}';
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Property change
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index b0b5e65fdf1..193e47c168e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -43,12 +43,12 @@ import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
@@ -72,6 +72,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static org.apache.flink.table.planner.utils.OperationConverterUtils.buildModifyColumnChange;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
/**
@@ -104,13 +105,13 @@ public class AlterSchemaConverter {
*/
public Schema applySchemaChange(
SqlAlterTableSchema alterTableSchema,
- Schema originSchema,
+ ResolvedCatalogTable oldTable,
List<TableChange> tableChangeCollector) {
AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
SchemaConverter converter =
strategy == AlterSchemaStrategy.ADD
? new AddSchemaConverter(
- originSchema,
+ oldTable.getUnresolvedSchema(),
(FlinkTypeFactory) sqlValidator.getTypeFactory(),
sqlValidator,
constraintValidator,
@@ -118,7 +119,7 @@ public class AlterSchemaConverter {
schemaResolver,
tableChangeCollector)
: new ModifySchemaConverter(
- originSchema,
+ oldTable,
(FlinkTypeFactory) sqlValidator.getTypeFactory(),
sqlValidator,
constraintValidator,
@@ -133,41 +134,47 @@ public class AlterSchemaConverter {
/** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
public Schema applySchemaChange(
- SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable originTable) {
- String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+ SqlAlterTableRenameColumn renameColumn,
+ ResolvedCatalogTable oldTable,
+ List<TableChange> tableChangeCollector) {
+ String oldColumnName = getColumnName(renameColumn.getOldColumnIdentifier());
String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
- // validate origin column is exists, new column name does not collide with existed column
- // names, and origin column isn't referenced by computed column
+ // validate old column is exists, new column name does not collide with existed column
+ // names, and old column isn't referenced by computed column
validateColumnName(
- originColumnName,
+ oldColumnName,
newColumnName,
- originTable.getResolvedSchema(),
- originTable.getPartitionKeys());
- validateWatermark(originTable, originColumnName);
+ oldTable.getResolvedSchema(),
+ oldTable.getPartitionKeys());
+ validateWatermark(oldTable, oldColumnName);
// generate new schema
Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
- originTable,
+ oldTable,
(builder, column) -> {
- if (column.getName().equals(originColumnName)) {
- buildNewColumnFromOriginColumn(builder, column, newColumnName);
+ if (column.getName().equals(oldColumnName)) {
+ buildNewColumnFromOldColumn(builder, column, newColumnName);
+ tableChangeCollector.add(
+ TableChange.modifyColumnName(
+ unwrap(
+ oldTable.getResolvedSchema()
+ .getColumn(oldColumnName)),
+ newColumnName));
} else {
builder.fromColumns(Collections.singletonList(column));
}
});
buildUpdatedPrimaryKey(
- schemaBuilder,
- originTable,
- (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
- buildUpdatedWatermark(schemaBuilder, originTable);
+ schemaBuilder, oldTable, (pk) -> pk.equals(oldColumnName) ? newColumnName : pk);
+ buildUpdatedWatermark(schemaBuilder, oldTable);
return schemaBuilder.build();
}
/** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
public Schema applySchemaChange(
- SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable originTable) {
+ SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable oldTable) {
Set<String> columnsToDrop = new HashSet<>();
dropColumn
.getColumnList()
@@ -188,28 +195,28 @@ public class AlterSchemaConverter {
// does not derive any computed column
validateColumnName(
columnToDrop,
- originTable.getResolvedSchema(),
- originTable.getPartitionKeys(),
+ oldTable.getResolvedSchema(),
+ oldTable.getPartitionKeys(),
columnsToDrop);
- validateWatermark(originTable, columnToDrop);
+ validateWatermark(oldTable, columnToDrop);
}
buildUpdatedColumn(
schemaBuilder,
- originTable,
+ oldTable,
(builder, column) -> {
if (!columnsToDrop.contains(column.getName())) {
builder.fromColumns(Collections.singletonList(column));
}
});
- buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
- buildUpdatedWatermark(schemaBuilder, originTable);
+ buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
+ buildUpdatedWatermark(schemaBuilder, oldTable);
return schemaBuilder.build();
}
/** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
public Schema applySchemaChange(
- SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable originTable) {
- Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+ SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable oldTable) {
+ Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
if (!pkConstraint.isPresent()) {
throw new ValidationException(
String.format(
@@ -218,9 +225,9 @@ public class AlterSchemaConverter {
Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
- originTable,
+ oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
- buildUpdatedWatermark(schemaBuilder, originTable);
+ buildUpdatedWatermark(schemaBuilder, oldTable);
return schemaBuilder.build();
}
@@ -228,8 +235,8 @@ public class AlterSchemaConverter {
* Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
*/
public Schema applySchemaChange(
- SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable originTable) {
- Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+ SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable oldTable) {
+ Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
if (!pkConstraint.isPresent()) {
throw new ValidationException(
String.format(
@@ -248,16 +255,16 @@ public class AlterSchemaConverter {
Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
- originTable,
+ oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
- buildUpdatedWatermark(schemaBuilder, originTable);
+ buildUpdatedWatermark(schemaBuilder, oldTable);
return schemaBuilder.build();
}
/** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
public Schema applySchemaChange(
- SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable originTable) {
- if (originTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+ SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable oldTable) {
+ if (oldTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
throw new ValidationException(
String.format(
"%sThe base table does not define any watermark strategy.",
@@ -266,9 +273,9 @@ public class AlterSchemaConverter {
Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
- originTable,
+ oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
- buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+ buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
return schemaBuilder.build();
}
@@ -289,10 +296,10 @@ public class AlterSchemaConverter {
SchemaResolver schemaResolver;
List<TableChange> changesCollector;
- List<Function<ResolvedSchema, TableChange>> changeBuilders = new ArrayList<>();
+ List<Function<ResolvedSchema, List<TableChange>>> changeBuilders = new ArrayList<>();
SchemaConverter(
- Schema originSchema,
+ Schema oldSchema,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -305,13 +312,13 @@ public class AlterSchemaConverter {
this.escapeExpressions = escapeExpressions;
this.schemaResolver = schemaResolver;
this.changesCollector = changesCollector;
- populateColumnsFromSourceTable(originSchema);
- populatePrimaryKeyFromSourceTable(originSchema);
- populateWatermarkFromSourceTable(originSchema);
+ populateColumnsFromSourceTable(oldSchema);
+ populatePrimaryKeyFromSourceTable(oldSchema);
+ populateWatermarkFromSourceTable(oldSchema);
}
- private void populateColumnsFromSourceTable(Schema originSchema) {
- originSchema
+ private void populateColumnsFromSourceTable(Schema oldSchema) {
+ oldSchema
.getColumns()
.forEach(
column -> {
@@ -321,15 +328,15 @@ public class AlterSchemaConverter {
});
}
- private void populatePrimaryKeyFromSourceTable(Schema originSchema) {
- if (originSchema.getPrimaryKey().isPresent()) {
- primaryKey = originSchema.getPrimaryKey().get();
+ private void populatePrimaryKeyFromSourceTable(Schema oldSchema) {
+ if (oldSchema.getPrimaryKey().isPresent()) {
+ primaryKey = oldSchema.getPrimaryKey().get();
}
}
- private void populateWatermarkFromSourceTable(Schema originSchema) {
+ private void populateWatermarkFromSourceTable(Schema oldSchema) {
for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
- originSchema.getWatermarkSpecs()) {
+ oldSchema.getWatermarkSpecs()) {
watermarkSpec = sourceWatermarkSpec;
}
}
@@ -379,14 +386,12 @@ public class AlterSchemaConverter {
private void updatePrimaryKeyNullability(String columnName) {
Schema.UnresolvedColumn column = columns.get(columnName);
if (column instanceof Schema.UnresolvedPhysicalColumn) {
- AbstractDataType<?> originType =
+ AbstractDataType<?> oldType =
((Schema.UnresolvedPhysicalColumn) column).getDataType();
columns.put(
columnName,
new Schema.UnresolvedPhysicalColumn(
- columnName,
- originType.notNull(),
- column.getComment().orElse(null)));
+ columnName, oldType.notNull(), column.getComment().orElse(null)));
}
}
@@ -442,10 +447,7 @@ public class AlterSchemaConverter {
@Nullable
String getComment(SqlTableColumn column) {
- return column.getComment()
- .map(SqlCharStringLiteral.class::cast)
- .map(c -> c.getValueAs(String.class))
- .orElse(null);
+ return OperationConverterUtils.getComment(column);
}
private void applyColumnPosition(List<SqlNode> alterColumns) {
@@ -511,7 +513,9 @@ public class AlterSchemaConverter {
ResolvedSchema resolvedSchema = schemaResolver.resolve(updatedSchema);
changesCollector.addAll(
changeBuilders.stream()
- .map(changeBuilder -> changeBuilder.apply(resolvedSchema))
+ .flatMap(
+ changeBuilder ->
+ changeBuilder.apply(resolvedSchema).stream())
.collect(Collectors.toList()));
return updatedSchema;
} catch (Exception e) {
@@ -528,10 +532,10 @@ public class AlterSchemaConverter {
abstract void checkAndCollectWatermarkChange();
}
- private static class AddSchemaConverter extends SchemaConverter {
+ private class AddSchemaConverter extends SchemaConverter {
AddSchemaConverter(
- Schema originSchema,
+ Schema oldSchema,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -539,7 +543,7 @@ public class AlterSchemaConverter {
SchemaResolver schemaResolver,
List<TableChange> changeCollector) {
super(
- originSchema,
+ oldSchema,
typeFactory,
sqlValidator,
constraintValidator,
@@ -559,7 +563,10 @@ public class AlterSchemaConverter {
primaryKey.getColumnNames().stream()
.collect(Collectors.joining("`, `", "[`", "`]"))));
}
- changeBuilders.add(schema -> TableChange.add(unwrap(schema.getPrimaryKey())));
+ changeBuilders.add(
+ schema ->
+ Collections.singletonList(
+ TableChange.add(unwrap(schema.getPrimaryKey()))));
}
@Override
@@ -574,7 +581,10 @@ public class AlterSchemaConverter {
((SqlCallExpression) watermarkSpec.getWatermarkExpression())
.getSqlExpression()));
}
- changeBuilders.add(schema -> TableChange.add(schema.getWatermarkSpecs().get(0)));
+ changeBuilders.add(
+ schema ->
+ Collections.singletonList(
+ TableChange.add(schema.getWatermarkSpecs().get(0))));
}
@Override
@@ -590,29 +600,36 @@ public class AlterSchemaConverter {
if (columnPosition.isFirstColumn()) {
changeBuilders.add(
schema ->
- TableChange.add(
- unwrap(schema.getColumn(columnName)),
- TableChange.ColumnPosition.first()));
+ Collections.singletonList(
+ TableChange.add(
+ unwrap(schema.getColumn(columnName)),
+ TableChange.ColumnPosition.first())));
sortedColumnNames.add(0, columnName);
} else if (columnPosition.isAfterReferencedColumn()) {
String referenceName = getReferencedColumn(columnPosition);
sortedColumnNames.add(sortedColumnNames.indexOf(referenceName) + 1, columnName);
changeBuilders.add(
schema ->
- TableChange.add(
- unwrap(schema.getColumn(columnName)),
- TableChange.ColumnPosition.after(referenceName)));
+ Collections.singletonList(
+ TableChange.add(
+ unwrap(schema.getColumn(columnName)),
+ TableChange.ColumnPosition.after(referenceName))));
} else {
- changeBuilders.add(schema -> TableChange.add(unwrap(schema.getColumn(columnName))));
+ changeBuilders.add(
+ schema ->
+ Collections.singletonList(
+ TableChange.add(unwrap(schema.getColumn(columnName)))));
sortedColumnNames.add(columnName);
}
}
}
- private static class ModifySchemaConverter extends SchemaConverter {
+ private class ModifySchemaConverter extends SchemaConverter {
+
+ private final ResolvedCatalogTable oldTable;
ModifySchemaConverter(
- Schema originSchema,
+ ResolvedCatalogTable oldTable,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -620,13 +637,14 @@ public class AlterSchemaConverter {
SchemaResolver schemaResolver,
List<TableChange> tableChangeCollector) {
super(
- originSchema,
+ oldTable.getUnresolvedSchema(),
typeFactory,
sqlValidator,
constraintValidator,
escapeExpressions,
schemaResolver,
tableChangeCollector);
+ this.oldTable = oldTable;
}
@Override
@@ -639,13 +657,33 @@ public class AlterSchemaConverter {
EX_MSG_PREFIX, columnName));
}
+ Column oldColumn = unwrap(oldTable.getResolvedSchema().getColumn(columnName));
if (columnPosition.isFirstColumn()) {
sortedColumnNames.remove(columnName);
sortedColumnNames.add(0, columnName);
+
+ changeBuilders.add(
+ schema ->
+ buildModifyColumnChange(
+ oldColumn,
+ unwrap(schema.getColumn(columnName)),
+ TableChange.ColumnPosition.first()));
} else if (columnPosition.isAfterReferencedColumn()) {
String referenceName = getReferencedColumn(columnPosition);
sortedColumnNames.remove(columnName);
sortedColumnNames.add(sortedColumnNames.indexOf(referenceName) + 1, columnName);
+
+ changeBuilders.add(
+ schema ->
+ buildModifyColumnChange(
+ oldColumn,
+ unwrap(schema.getColumn(columnName)),
+ TableChange.ColumnPosition.after(referenceName)));
+ } else {
+ changeBuilders.add(
+ schema ->
+ buildModifyColumnChange(
+ oldColumn, unwrap(schema.getColumn(columnName)), null));
}
}
@@ -658,6 +696,10 @@ public class AlterSchemaConverter {
+ "want to add a new one.",
EX_MSG_PREFIX));
}
+ changeBuilders.add(
+ schema ->
+ Collections.singletonList(
+ TableChange.modify(unwrap(schema.getPrimaryKey()))));
}
@Override
@@ -669,6 +711,10 @@ public class AlterSchemaConverter {
+ "want to add a new one.",
EX_MSG_PREFIX));
}
+ changeBuilders.add(
+ schema ->
+ Collections.singletonList(
+ TableChange.modify(schema.getWatermarkSpecs().get(0))));
}
@Nullable
@@ -685,18 +731,18 @@ public class AlterSchemaConverter {
// --------------------------------------------------------------------------------------------
private void validateColumnName(
- String originColumnName,
+ String oldColumnName,
String newColumnName,
- ResolvedSchema originSchemas,
+ ResolvedSchema oldSchema,
List<String> partitionKeys) {
validateColumnName(
- originColumnName,
- originSchemas,
+ oldColumnName,
+ oldSchema,
partitionKeys,
// fail the operation of renaming column, once the column derives a computed column
- (referencedColumn, computedColumn) -> referencedColumn.contains(originColumnName));
+ (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName));
// validate new column
- if (originSchemas.getColumn(newColumnName).isPresent()) {
+ if (oldSchema.getColumn(newColumnName).isPresent()) {
throw new ValidationException(
String.format(
"%sThe column `%s` already existed in table schema.",
@@ -706,19 +752,19 @@ public class AlterSchemaConverter {
private void validateColumnName(
String columnToDrop,
- ResolvedSchema originSchema,
+ ResolvedSchema oldSchema,
List<String> partitionKeys,
Set<String> columnsToDrop) {
validateColumnName(
columnToDrop,
- originSchema,
+ oldSchema,
partitionKeys,
// fail the operation of dropping column, only if the column derives a computed
- // column, and the computed column is not being dropped along with the origin column
+ // column, and the computed column is not being dropped along with the old column
(referencedColumn, computedColumn) ->
referencedColumn.contains(columnToDrop)
&& !columnsToDrop.contains(computedColumn.getName()));
- originSchema
+ oldSchema
.getPrimaryKey()
.ifPresent(
pk -> {
@@ -733,11 +779,11 @@ public class AlterSchemaConverter {
private void validateColumnName(
String columnToAlter,
- ResolvedSchema originSchema,
+ ResolvedSchema oldSchema,
List<String> partitionKeys,
BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
- // validate origin column
- Set<String> tableColumns = new HashSet<>(originSchema.getColumnNames());
+ // validate old column
+ Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
if (!tableColumns.contains(columnToAlter)) {
throw new ValidationException(
String.format(
@@ -745,15 +791,15 @@ public class AlterSchemaConverter {
EX_MSG_PREFIX, columnToAlter));
}
- // validate origin column name isn't referred by computed column case
- originSchema.getColumns().stream()
+ // validate old column name isn't referred by computed column case
+ oldSchema.getColumns().stream()
.filter(column -> column instanceof Column.ComputedColumn)
.forEach(
column -> {
Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
Set<String> referencedColumn =
ColumnReferenceFinder.findReferencedColumn(
- computedColumn.getName(), originSchema);
+ computedColumn.getName(), oldSchema);
if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
throw new ValidationException(
String.format(
@@ -763,7 +809,7 @@ public class AlterSchemaConverter {
computedColumn.asSummaryString()));
}
});
- // validate partition keys doesn't contain the origin column
+ // validate partition keys doesn't contain the old column
if (partitionKeys.contains(columnToAlter)) {
throw new ValidationException(
String.format(
@@ -772,14 +818,13 @@ public class AlterSchemaConverter {
}
}
- private void validateWatermark(ResolvedCatalogTable originTable, String columnToAlter) {
- // validate origin column isn't referenced by watermark
- List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs();
+ private void validateWatermark(ResolvedCatalogTable oldTable, String columnToAlter) {
+ // validate old column isn't referenced by watermark
+ List<WatermarkSpec> watermarkSpecs = oldTable.getResolvedSchema().getWatermarkSpecs();
Set<String> referencedColumns =
- ColumnReferenceFinder.findWatermarkReferencedColumn(
- originTable.getResolvedSchema());
+ ColumnReferenceFinder.findWatermarkReferencedColumn(oldTable.getResolvedSchema());
Set<String> rowtimeAttributes =
- originTable.getResolvedSchema().getWatermarkSpecs().stream()
+ oldTable.getResolvedSchema().getWatermarkSpecs().stream()
.map(WatermarkSpec::getRowtimeAttribute)
.collect(Collectors.toSet());
if (rowtimeAttributes.contains(columnToAlter)
@@ -793,37 +838,34 @@ public class AlterSchemaConverter {
private void buildUpdatedColumn(
Schema.Builder builder,
- ResolvedCatalogTable originTable,
+ ResolvedCatalogTable oldTable,
BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) {
// build column
- originTable
- .getUnresolvedSchema()
+ oldTable.getUnresolvedSchema()
.getColumns()
.forEach(column -> columnConsumer.accept(builder, column));
}
private void buildUpdatedPrimaryKey(
Schema.Builder builder,
- ResolvedCatalogTable originTable,
+ ResolvedCatalogTable oldTable,
Function<String, String> columnRenamer) {
- originTable
- .getUnresolvedSchema()
+ oldTable.getUnresolvedSchema()
.getPrimaryKey()
.ifPresent(
pk -> {
- List<String> originPrimaryKeyNames = pk.getColumnNames();
+ List<String> oldPrimaryKeyNames = pk.getColumnNames();
String constrainName = pk.getConstraintName();
List<String> newPrimaryKeyNames =
- originPrimaryKeyNames.stream()
+ oldPrimaryKeyNames.stream()
.map(columnRenamer)
.collect(Collectors.toList());
builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
});
}
- private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable originTable) {
- originTable
- .getUnresolvedSchema()
+ private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable oldTable) {
+ oldTable.getUnresolvedSchema()
.getWatermarkSpecs()
.forEach(
watermarkSpec ->
@@ -832,24 +874,23 @@ public class AlterSchemaConverter {
watermarkSpec.getWatermarkExpression()));
}
- private void buildNewColumnFromOriginColumn(
- Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
- if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+ private void buildNewColumnFromOldColumn(
+ Schema.Builder builder, Schema.UnresolvedColumn oldColumn, String columnName) {
+ if (oldColumn instanceof Schema.UnresolvedComputedColumn) {
builder.columnByExpression(
- columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
- } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
- builder.column(
- columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
- } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+ columnName, ((Schema.UnresolvedComputedColumn) oldColumn).getExpression());
+ } else if (oldColumn instanceof Schema.UnresolvedPhysicalColumn) {
+ builder.column(columnName, ((Schema.UnresolvedPhysicalColumn) oldColumn).getDataType());
+ } else if (oldColumn instanceof Schema.UnresolvedMetadataColumn) {
Schema.UnresolvedMetadataColumn metadataColumn =
- (Schema.UnresolvedMetadataColumn) originColumn;
+ (Schema.UnresolvedMetadataColumn) oldColumn;
builder.columnByMetadata(
columnName,
metadataColumn.getDataType(),
metadataColumn.getMetadataKey(),
metadataColumn.isVirtual());
}
- originColumn.getComment().ifPresent(builder::withComment);
+ oldColumn.getComment().ifPresent(builder::withComment);
}
private static String getColumnName(SqlIdentifier identifier) {
@@ -874,7 +915,7 @@ public class AlterSchemaConverter {
alterTableSchema.getClass().getCanonicalName()));
}
- private static <T> T unwrap(Optional<T> value) {
+ private <T> T unwrap(Optional<T> value) {
return value.orElseThrow(() -> new TableException("The value should never be empty."));
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index acb46f60fc9..bd5aa2ec984 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -554,16 +554,19 @@ public class SqlToOperationConverter {
return OperationConverterUtils.convertChangeColumn(
tableIdentifier,
(SqlChangeColumn) sqlAlterTable,
- (CatalogTable) baseTable,
+ (ResolvedCatalogTable) baseTable,
flinkPlanner.getOrCreateSqlValidator());
} else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
(SqlAlterTableRenameColumn) sqlAlterTable;
+ ResolvedCatalogTable baseCatalogTable = (ResolvedCatalogTable) baseTable;
+ List<TableChange> tableChanges = new ArrayList<>();
Schema newSchema =
alterSchemaConverter.applySchemaChange(
- sqlAlterTableRenameColumn, resolvedCatalogTable);
- return new AlterTableSchemaOperation(
+ sqlAlterTableRenameColumn, baseCatalogTable, tableChanges);
+ return new AlterTableChangeOperation(
tableIdentifier,
+ tableChanges,
CatalogTable.of(
newSchema,
resolvedCatalogTable.getComment(),
@@ -723,20 +726,19 @@ public class SqlToOperationConverter {
private Operation convertAlterTableSchema(
ObjectIdentifier tableIdentifier,
- ResolvedCatalogTable originalTable,
+ ResolvedCatalogTable oldTable,
SqlAlterTableSchema alterTableSchema) {
List<TableChange> tableChanges = new ArrayList<>();
Schema newSchema =
- alterSchemaConverter.applySchemaChange(
- alterTableSchema, originalTable.getUnresolvedSchema(), tableChanges);
+ alterSchemaConverter.applySchemaChange(alterTableSchema, oldTable, tableChanges);
return new AlterTableChangeOperation(
tableIdentifier,
tableChanges,
CatalogTable.of(
newSchema,
- originalTable.getComment(),
- originalTable.getPartitionKeys(),
- originalTable.getOptions()));
+ oldTable.getComment(),
+ oldTable.getPartitionKeys(),
+ oldTable.getOptions()));
}
/** Convert CREATE FUNCTION statement. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 43539d23552..7587200850c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
@@ -47,12 +48,16 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.validate.SqlValidator;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -95,6 +100,7 @@ public class OperationConverterUtils {
setWatermarkAndPK(builder, catalogTable.getSchema());
}
List<TableChange> tableChanges = new ArrayList<>();
+ // TODO: support TableChange in FLINK-30497
for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
TableColumn tableColumn = toTableColumn((SqlTableColumn) sqlNode, sqlValidator);
builder.add(tableColumn);
@@ -103,12 +109,7 @@ public class OperationConverterUtils {
tableChanges.add(
TableChange.add(
Column.physical(tableColumn.getName(), tableColumn.getType())
- .withComment(
- ((SqlTableColumn) sqlNode)
- .getComment()
- .map(SqlCharStringLiteral.class::cast)
- .map(c -> c.getValueAs(String.class))
- .orElse(null))));
+ .withComment(getComment((SqlTableColumn) sqlNode))));
}
}
@@ -141,7 +142,7 @@ public class OperationConverterUtils {
public static Operation convertChangeColumn(
ObjectIdentifier tableIdentifier,
SqlChangeColumn changeColumn,
- CatalogTable catalogTable,
+ ResolvedCatalogTable catalogTable,
SqlValidator sqlValidator) {
String oldName = changeColumn.getOldName().getSimple();
if (catalogTable.getPartitionKeys().indexOf(oldName) >= 0) {
@@ -151,12 +152,29 @@ public class OperationConverterUtils {
TableSchema oldSchema = catalogTable.getSchema();
boolean first = changeColumn.isFirst();
String after = changeColumn.getAfter() == null ? null : changeColumn.getAfter().getSimple();
- TableColumn newTableColumn = toTableColumn(changeColumn.getNewColumn(), sqlValidator);
+ TableColumn.PhysicalColumn newTableColumn =
+ toTableColumn(changeColumn.getNewColumn(), sqlValidator);
TableSchema newSchema = changeColumn(oldSchema, oldName, newTableColumn, first, after);
Map<String, String> newProperties = new HashMap<>(catalogTable.getOptions());
newProperties.putAll(extractProperties(changeColumn.getProperties()));
- return new AlterTableSchemaOperation(
+
+ List<TableChange> tableChanges =
+ buildModifyColumnChange(
+ catalogTable
+ .getResolvedSchema()
+ .getColumn(oldName)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ "Failed to get old column: " + oldName)),
+ Column.physical(newTableColumn.getName(), newTableColumn.getType())
+ .withComment(getComment(changeColumn.getNewColumn())),
+ first
+ ? TableChange.ColumnPosition.first()
+ : (after == null ? null : TableChange.ColumnPosition.after(after)));
+ return new AlterTableChangeOperation(
tableIdentifier,
+ tableChanges,
new CatalogTableImpl(
newSchema,
catalogTable.getPartitionKeys(),
@@ -165,6 +183,44 @@ public class OperationConverterUtils {
// TODO: handle watermark and constraints
}
+ public static List<TableChange> buildModifyColumnChange(
+ Column oldColumn,
+ Column newColumn,
+ @Nullable TableChange.ColumnPosition columnPosition) {
+ if (oldColumn.isPhysical() && newColumn.isPhysical()) {
+ List<TableChange> changes = new ArrayList<>();
+ String newComment = newColumn.getComment().orElse(oldColumn.getComment().orElse(null));
+ if (!newColumn.getComment().equals(oldColumn.getComment())) {
+ changes.add(TableChange.modifyColumnComment(oldColumn, newComment));
+ }
+
+ if (!oldColumn
+ .getDataType()
+ .getLogicalType()
+ .equals(newColumn.getDataType().getLogicalType())) {
+ changes.add(
+ TableChange.modifyPhysicalColumnType(
+ oldColumn.withComment(newComment), newColumn.getDataType()));
+ }
+
+ if (!Objects.equals(newColumn.getName(), oldColumn.getName())) {
+ changes.add(
+ TableChange.modifyColumnName(
+ oldColumn.withComment(newComment).copy(newColumn.getDataType()),
+ newColumn.getName()));
+ }
+
+ if (columnPosition != null) {
+ changes.add(TableChange.modifyColumnPosition(newColumn, columnPosition));
+ }
+
+ return changes;
+ } else {
+ return Collections.singletonList(
+ TableChange.modify(oldColumn, newColumn, columnPosition));
+ }
+ }
+
// change a column in the old table schema and return the updated table schema
public static TableSchema changeColumn(
TableSchema oldSchema,
@@ -206,7 +262,14 @@ public class OperationConverterUtils {
return builder.build();
}
- private static TableColumn toTableColumn(
+ public static @Nullable String getComment(SqlTableColumn column) {
+ return column.getComment()
+ .map(SqlCharStringLiteral.class::cast)
+ .map(c -> c.getValueAs(String.class))
+ .orElse(null);
+ }
+
+ private static TableColumn.PhysicalColumn toTableColumn(
SqlTableColumn tableColumn, SqlValidator sqlValidator) {
if (!(tableColumn instanceof SqlRegularColumn)) {
throw new TableException("Only regular columns are supported for this operation yet.");
@@ -214,7 +277,7 @@ public class OperationConverterUtils {
SqlRegularColumn regularColumn = (SqlRegularColumn) tableColumn;
String name = regularColumn.getName().getSimple();
SqlDataTypeSpec typeSpec = regularColumn.getType();
- boolean nullable = typeSpec.getNullable() == null ? true : typeSpec.getNullable();
+ boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable();
LogicalType logicalType =
FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable));
DataType dataType = TypeConversions.fromLogicalToDataType(logicalType);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 5ad6bce141a..10a7af4f4f6 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -1273,39 +1273,61 @@ public class SqlToOperationConverterTest {
prepareTable("tb1", false, false, true, 3);
// rename pk column c
Operation operation = parse("alter table tb1 rename c to c1");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1\n MODIFY `c` TO `c1`");
+ assertThat(((AlterTableChangeOperation) operation).getNewTable().getUnresolvedSchema())
.isEqualTo(
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT NOT NULL,\n"
- + " `c1` STRING NOT NULL COMMENT 'column comment',\n"
- + " `d` AS [a*(b+2 + a*b)],\n"
- + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
- + " `f` AS [e.f1 + e.f2.f0],\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
- + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
- + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c1`) NOT ENFORCED\n"
- + ")");
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.BIGINT().notNull())
+ .column("c1", DataTypes.STRING().notNull())
+ .withComment("column comment")
+ .columnByExpression("d", "a*(b+2 + a*b)")
+ .column(
+ "e",
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.ROW(
+ DataTypes.DOUBLE(),
+ DataTypes.ARRAY(DataTypes.FLOAT()))))
+ .columnByExpression("f", "e.f1 + e.f2.f0")
+ .columnByMetadata("g", DataTypes.STRING(), null, true)
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .withComment("just a comment")
+ .watermark("ts", "ts - interval '5' seconds")
+ .primaryKeyNamed("ct1", "a", "b", "c1")
+ .build());
// rename computed column
operation = parse("alter table tb1 rename f to f1");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString())
+ .isEqualTo("ALTER TABLE cat1.db1.tb1\n MODIFY `f` TO `f1`");
+ assertThat(((AlterTableChangeOperation) operation).getNewTable().getUnresolvedSchema())
.isEqualTo(
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT NOT NULL,\n"
- + " `c` STRING NOT NULL COMMENT 'column comment',\n"
- + " `d` AS [a*(b+2 + a*b)],\n"
- + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
- + " `f1` AS [e.f1 + e.f2.f0],\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
- + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
- + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c`) NOT ENFORCED\n"
- + ")");
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.BIGINT().notNull())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
+ .columnByExpression("d", "a*(b+2 + a*b)")
+ .column(
+ "e",
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.ROW(
+ DataTypes.DOUBLE(),
+ DataTypes.ARRAY(DataTypes.FLOAT()))))
+ .columnByExpression("f1", "e.f1 + e.f2.f0")
+ .columnByMetadata("g", DataTypes.STRING(), null, true)
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .withComment("just a comment")
+ .watermark("ts", "ts - interval '5' seconds")
+ .primaryKeyNamed("ct1", "a", "b", "c")
+ .build());
// rename column c that is used in a computed column
assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
@@ -2076,6 +2098,11 @@ public class SqlToOperationConverterTest {
Operation operation =
parse(
"alter table tb1 modify b bigint not null comment 'move b to first and add comment' first");
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1\n"
+ + " MODIFY `b` COMMENT 'move b to first and add comment',\n"
+ + " MODIFY `b` FIRST");
assertAlterTableSchema(
operation,
tableIdentifier,
@@ -2103,6 +2130,11 @@ public class SqlToOperationConverterTest {
// change nullability and pos
operation = parse("alter table tb1 modify ts timestamp(3) not null after e");
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1\n"
+ + " MODIFY `ts` TIMESTAMP(3) NOT NULL,\n"
+ + " MODIFY `ts` AFTER `e`");
assertAlterTableSchema(
operation,
tableIdentifier,
@@ -2137,7 +2169,17 @@ public class SqlToOperationConverterTest {
+ "f as upper(e) comment 'change f' after ts,\n"
+ "g int not null comment 'change g',\n"
+ "constraint ct2 primary key(e) not enforced)");
-
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1\n"
+ + " MODIFY `d` INT NOT NULL AS `a` + 2 COMMENT 'change d' AFTER `b`,\n"
+ + " MODIFY `c` BIGINT,\n"
+ + " MODIFY `c` FIRST,\n"
+ + " MODIFY `e` COMMENT 'change e',\n"
+ + " MODIFY `e` STRING NOT NULL,\n"
+ + " MODIFY `f` STRING NOT NULL AS UPPER(`e`) COMMENT 'change f' AFTER `ts`,\n"
+ + " MODIFY `g` INT NOT NULL COMMENT 'change g' ,\n"
+ + " MODIFY CONSTRAINT `ct2` PRIMARY KEY (`e`) NOT ENFORCED");
assertAlterTableSchema(
operation,
tableIdentifier,
@@ -2169,7 +2211,15 @@ public class SqlToOperationConverterTest {
+ "e int metadata virtual,\n"
+ "watermark for f as f,\n"
+ "g multiset<int> not null comment 'change g' first)");
-
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb2\n"
+ + " MODIFY `ts` COMMENT 'change ts',\n"
+ + " MODIFY `ts` INT,\n"
+ + " MODIFY `f` TIMESTAMP(3) NOT NULL ,\n"
+ + " MODIFY `e` INT METADATA VIRTUAL ,\n"
+ + " MODIFY `g` MULTISET<INT> NOT NULL COMMENT 'change g' FIRST,\n"
+ + " MODIFY WATERMARK FOR `f`: TIMESTAMP(3) NOT NULL AS `f`");
assertAlterTableSchema(
operation,
tableIdentifier,