You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/05 04:26:56 UTC
[flink] branch master updated: [FLINK-30497][table-api] Introduce TableChange to represent DROP change
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cce059c556b [FLINK-30497][table-api] Introduce TableChange to represent DROP change
cce059c556b is described below
commit cce059c556bc2538cba8918e281320516c4be222
Author: Shengkai <33...@users.noreply.github.com>
AuthorDate: Thu Jan 5 12:26:48 2023 +0800
[FLINK-30497][table-api] Introduce TableChange to represent DROP change
This closes #21592
---
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 1 -
.../operations/ddl/AlterTableChangeOperation.java | 9 +
.../apache/flink/table/catalog/TableChange.java | 162 ++++++++
.../planner/expressions/ColumnReferenceFinder.java | 7 +-
.../planner/operations/AlterSchemaConverter.java | 430 ++++++++++++---------
.../operations/SqlToOperationConverter.java | 88 +----
.../planner/utils/OperationConverterUtils.java | 10 +-
.../expressions/ColumnReferenceFinderTest.java | 5 +-
.../operations/SqlToOperationConverterTest.java | 102 +++--
9 files changed, 494 insertions(+), 320 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 79cef5614be..fc7913c4efe 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -2006,7 +2006,6 @@ public class HiveParserDDLSemanticAnalyzer {
final int numPartCol = oldTable.getPartitionKeys().size();
TableSchema.Builder builder = TableSchema.builder();
// add existing non-part col if we're not replacing
- // TODO: support TableChange in FLINK-30497
if (!replace) {
List<TableColumn> nonPartCols =
oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index e8ef9519de1..ad083d1e574 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -119,6 +119,15 @@ public class AlterTableChangeOperation extends AlterTableOperation {
TableChange.ModifyUniqueConstraint modifyUniqueConstraint =
(TableChange.ModifyUniqueConstraint) tableChange;
return String.format(" MODIFY %s", modifyUniqueConstraint.getNewConstraint());
+ } else if (tableChange instanceof TableChange.DropColumn) {
+ TableChange.DropColumn dropColumn = (TableChange.DropColumn) tableChange;
+ return String.format(
+ " DROP %s", EncodingUtils.escapeIdentifier(dropColumn.getColumnName()));
+ } else if (tableChange instanceof TableChange.DropConstraint) {
+ TableChange.DropConstraint dropConstraint = (TableChange.DropConstraint) tableChange;
+ return String.format(" DROP CONSTRAINT %s", dropConstraint.getConstraintName());
+ } else if (tableChange instanceof TableChange.DropWatermark) {
+ return " DROP WATERMARK";
} else {
throw new UnsupportedOperationException(
String.format("Unknown table change: %s.", tableChange));
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index 240148836c8..34eba1d6845 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -229,6 +229,53 @@ public interface TableChange {
return new ModifyWatermark(newWatermarkSpec);
}
+ /**
+ * A table change to drop column.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP COLUMN <column_name>
+ * </pre>
+ *
+ * @param columnName the column to drop.
+ * @return a TableChange represents the modification.
+ */
+ static DropColumn dropColumn(String columnName) {
+ return new DropColumn(columnName);
+ }
+
+ /**
+ * A table change to drop watermark.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP WATERMARK
+ * </pre>
+ *
+ * @return a TableChange represents the modification.
+ */
+ static DropWatermark dropWatermark() {
+ return DropWatermark.INSTANCE;
+ }
+
+ /**
+ * A table change to drop constraint.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP CONSTRAINT <constraint_name>
+ * </pre>
+ *
+ * @param constraintName the constraint to drop.
+ * @return a TableChange represents the modification.
+ */
+ static DropConstraint dropConstraint(String constraintName) {
+ return new DropConstraint(constraintName);
+ }
+
/**
* A table change to set the table option.
*
@@ -766,6 +813,121 @@ public interface TableChange {
}
}
+ // --------------------------------------------------------------------------------------------
+ // Drop Change
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A table change to drop the column.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP COLUMN <column_name>
+ * </pre>
+ */
+ @PublicEvolving
+ class DropColumn implements TableChange {
+
+ private final String columnName;
+
+ private DropColumn(String columnName) {
+ this.columnName = columnName;
+ }
+
+ /** Returns the column name. */
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DropColumn)) {
+ return false;
+ }
+ DropColumn that = (DropColumn) o;
+ return Objects.equals(columnName, that.columnName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnName);
+ }
+
+ @Override
+ public String toString() {
+ return "DropColumn{" + "columnName='" + columnName + '\'' + '}';
+ }
+ }
+
+ /**
+ * A table change to drop the watermark.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP WATERMARK
+ * </pre>
+ */
+ @PublicEvolving
+ class DropWatermark implements TableChange {
+ static final DropWatermark INSTANCE = new DropWatermark();
+
+ @Override
+ public String toString() {
+ return "DropWatermark";
+ }
+ }
+
+ /**
+ * A table change to drop the constraints.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> DROP CONSTRAINT <constraint_name>
+ * </pre>
+ */
+ @PublicEvolving
+ class DropConstraint implements TableChange {
+
+ private final String constraintName;
+
+ private DropConstraint(String constraintName) {
+ this.constraintName = constraintName;
+ }
+
+ /** Returns the constraint name. */
+ public String getConstraintName() {
+ return constraintName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DropConstraint)) {
+ return false;
+ }
+ DropConstraint that = (DropConstraint) o;
+ return Objects.equals(constraintName, that.constraintName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(constraintName);
+ }
+
+ @Override
+ public String toString() {
+ return "DropConstraint{" + "constraintName='" + constraintName + '\'' + '}';
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Property change
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
index d3593cb4d85..60216e5bd05 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
public class ColumnReferenceFinder {
@@ -81,7 +82,11 @@ public class ColumnReferenceFinder {
public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) {
ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames());
return schema.getWatermarkSpecs().stream()
- .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream())
+ .flatMap(
+ spec ->
+ Stream.concat(
+ visitor.visit(spec.getWatermarkExpression()).stream(),
+ Stream.of(spec.getRowtimeAttribute())))
.collect(Collectors.toSet());
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 193e47c168e..57f6fae4384 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.operations;
+import org.apache.flink.sql.parser.ddl.SqlAlterTable;
import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
@@ -33,19 +34,24 @@ import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.calcite.rel.type.RelDataType;
@@ -59,6 +65,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -66,7 +73,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -86,67 +92,47 @@ public class AlterSchemaConverter {
private final SqlValidator sqlValidator;
private final Function<SqlNode, String> escapeExpression;
private final Consumer<SqlTableConstraint> constraintValidator;
- private final SchemaResolver schemaResolver;
+ private final CatalogManager catalogManager;
AlterSchemaConverter(
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
Function<SqlNode, String> escapeExpression,
- SchemaResolver schemaResolver) {
+ CatalogManager catalogManager) {
this.sqlValidator = sqlValidator;
this.escapeExpression = escapeExpression;
this.constraintValidator = constraintValidator;
- this.schemaResolver = schemaResolver;
+ this.catalogManager = catalogManager;
}
/**
* Convert ALTER TABLE ADD | MODIFY (<schema_component> [, <schema_component>, ...])
* to generate an updated Schema.
*/
- public Schema applySchemaChange(
- SqlAlterTableSchema alterTableSchema,
- ResolvedCatalogTable oldTable,
- List<TableChange> tableChangeCollector) {
- AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema);
- SchemaConverter converter =
- strategy == AlterSchemaStrategy.ADD
- ? new AddSchemaConverter(
- oldTable.getUnresolvedSchema(),
- (FlinkTypeFactory) sqlValidator.getTypeFactory(),
- sqlValidator,
- constraintValidator,
- escapeExpression,
- schemaResolver,
- tableChangeCollector)
- : new ModifySchemaConverter(
- oldTable,
- (FlinkTypeFactory) sqlValidator.getTypeFactory(),
- sqlValidator,
- constraintValidator,
- escapeExpression,
- schemaResolver,
- tableChangeCollector);
+ public Operation convertAlterSchema(
+ SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable oldTable) {
+ SchemaConverter converter = createSchemaConverter(alterTableSchema, oldTable);
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
- return converter.convert();
+
+ return buildAlterTableChangeOperation(
+ alterTableSchema, converter.changesCollector, converter.convert(), oldTable);
}
/** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
- public Schema applySchemaChange(
- SqlAlterTableRenameColumn renameColumn,
- ResolvedCatalogTable oldTable,
- List<TableChange> tableChangeCollector) {
+ public Operation convertAlterSchema(
+ SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable oldTable) {
String oldColumnName = getColumnName(renameColumn.getOldColumnIdentifier());
String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
- // validate old column is exists, new column name does not collide with existed column
- // names, and old column isn't referenced by computed column
- validateColumnName(
- oldColumnName,
- newColumnName,
- oldTable.getResolvedSchema(),
- oldTable.getPartitionKeys());
- validateWatermark(oldTable, oldColumnName);
+
+ ReferencesManager.create(oldTable).checkReferences(oldColumnName);
+ if (oldTable.getResolvedSchema().getColumn(newColumnName).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` already existed in table schema.",
+ EX_MSG_PREFIX, newColumnName));
+ }
// generate new schema
Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -156,12 +142,6 @@ public class AlterSchemaConverter {
(builder, column) -> {
if (column.getName().equals(oldColumnName)) {
buildNewColumnFromOldColumn(builder, column, newColumnName);
- tableChangeCollector.add(
- TableChange.modifyColumnName(
- unwrap(
- oldTable.getResolvedSchema()
- .getColumn(oldColumnName)),
- newColumnName));
} else {
builder.fromColumns(Collections.singletonList(column));
}
@@ -169,11 +149,19 @@ public class AlterSchemaConverter {
buildUpdatedPrimaryKey(
schemaBuilder, oldTable, (pk) -> pk.equals(oldColumnName) ? newColumnName : pk);
buildUpdatedWatermark(schemaBuilder, oldTable);
- return schemaBuilder.build();
+
+ return buildAlterTableChangeOperation(
+ renameColumn,
+ Collections.singletonList(
+ TableChange.modifyColumnName(
+ unwrap(oldTable.getResolvedSchema().getColumn(oldColumnName)),
+ newColumnName)),
+ schemaBuilder.build(),
+ oldTable);
}
/** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
- public Schema applySchemaChange(
+ public Operation convertAlterSchema(
SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable oldTable) {
Set<String> columnsToDrop = new HashSet<>();
dropColumn
@@ -188,18 +176,25 @@ public class AlterSchemaConverter {
}
});
- Schema.Builder schemaBuilder = Schema.newBuilder();
- for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
- String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
- // validate the column to drop exists in the table schema, is not a primary key and
- // does not derive any computed column
- validateColumnName(
- columnToDrop,
- oldTable.getResolvedSchema(),
- oldTable.getPartitionKeys(),
- columnsToDrop);
- validateWatermark(oldTable, columnToDrop);
+ ReferencesManager referencesManager = ReferencesManager.create(oldTable);
+ // Sort by dependencies count from smallest to largest. For example, when dropping column a,
+ // b(b as a+1), the order should be: [b, a] after sort.
+ List<String> sortedColumnsToDrop =
+ columnsToDrop.stream()
+ .sorted(
+ Comparator.comparingInt(
+ col ->
+ referencesManager.getColumnDependencyCount(
+ (String) col))
+ .reversed())
+ .collect(Collectors.toList());
+ List<TableChange> tableChanges = new ArrayList<>(sortedColumnsToDrop.size());
+ for (String columnToDrop : sortedColumnsToDrop) {
+ referencesManager.dropColumn(columnToDrop);
+ tableChanges.add(TableChange.dropColumn(columnToDrop));
}
+
+ Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
oldTable,
@@ -210,11 +205,13 @@ public class AlterSchemaConverter {
});
buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
buildUpdatedWatermark(schemaBuilder, oldTable);
- return schemaBuilder.build();
+
+ return buildAlterTableChangeOperation(
+ dropColumn, tableChanges, schemaBuilder.build(), oldTable);
}
/** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
- public Schema applySchemaChange(
+ public Operation convertAlterSchema(
SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable oldTable) {
Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
if (!pkConstraint.isPresent()) {
@@ -228,13 +225,18 @@ public class AlterSchemaConverter {
oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
buildUpdatedWatermark(schemaBuilder, oldTable);
- return schemaBuilder.build();
+
+ return buildAlterTableChangeOperation(
+ dropPrimaryKey,
+ Collections.singletonList(TableChange.dropConstraint(pkConstraint.get().getName())),
+ schemaBuilder.build(),
+ oldTable);
}
/**
* Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
*/
- public Schema applySchemaChange(
+ public Operation convertAlterSchema(
SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable oldTable) {
Optional<UniqueConstraint> pkConstraint = oldTable.getResolvedSchema().getPrimaryKey();
if (!pkConstraint.isPresent()) {
@@ -258,11 +260,16 @@ public class AlterSchemaConverter {
oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
buildUpdatedWatermark(schemaBuilder, oldTable);
- return schemaBuilder.build();
+
+ return buildAlterTableChangeOperation(
+ dropConstraint,
+ Collections.singletonList(TableChange.dropConstraint(constraintName)),
+ schemaBuilder.build(),
+ oldTable);
}
/** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
- public Schema applySchemaChange(
+ public Operation convertAlterSchema(
SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable oldTable) {
if (oldTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
throw new ValidationException(
@@ -270,13 +277,19 @@ public class AlterSchemaConverter {
"%sThe base table does not define any watermark strategy.",
EX_MSG_PREFIX));
}
+
Schema.Builder schemaBuilder = Schema.newBuilder();
buildUpdatedColumn(
schemaBuilder,
oldTable,
(builder, column) -> builder.fromColumns(Collections.singletonList(column)));
buildUpdatedPrimaryKey(schemaBuilder, oldTable, Function.identity());
- return schemaBuilder.build();
+
+ return buildAlterTableChangeOperation(
+ dropWatermark,
+ Collections.singletonList(TableChange.dropWatermark()),
+ schemaBuilder.build(),
+ oldTable);
}
// --------------------------------------------------------------------------------------------
@@ -304,14 +317,13 @@ public class AlterSchemaConverter {
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
Function<SqlNode, String> escapeExpressions,
- SchemaResolver schemaResolver,
- List<TableChange> changesCollector) {
+ SchemaResolver schemaResolver) {
this.typeFactory = typeFactory;
this.sqlValidator = sqlValidator;
this.constraintValidator = constraintValidator;
this.escapeExpressions = escapeExpressions;
this.schemaResolver = schemaResolver;
- this.changesCollector = changesCollector;
+ this.changesCollector = new ArrayList<>();
populateColumnsFromSourceTable(oldSchema);
populatePrimaryKeyFromSourceTable(oldSchema);
populateWatermarkFromSourceTable(oldSchema);
@@ -540,16 +552,14 @@ public class AlterSchemaConverter {
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
Function<SqlNode, String> escapeExpressions,
- SchemaResolver schemaResolver,
- List<TableChange> changeCollector) {
+ SchemaResolver schemaResolver) {
super(
oldSchema,
typeFactory,
sqlValidator,
constraintValidator,
escapeExpressions,
- schemaResolver,
- changeCollector);
+ schemaResolver);
}
@Override
@@ -634,16 +644,14 @@ public class AlterSchemaConverter {
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
Function<SqlNode, String> escapeExpressions,
- SchemaResolver schemaResolver,
- List<TableChange> tableChangeCollector) {
+ SchemaResolver schemaResolver) {
super(
oldTable.getUnresolvedSchema(),
typeFactory,
sqlValidator,
constraintValidator,
escapeExpressions,
- schemaResolver,
- tableChangeCollector);
+ schemaResolver);
this.oldTable = oldTable;
}
@@ -728,114 +736,141 @@ public class AlterSchemaConverter {
}
}
- // --------------------------------------------------------------------------------------------
-
- private void validateColumnName(
- String oldColumnName,
- String newColumnName,
- ResolvedSchema oldSchema,
- List<String> partitionKeys) {
- validateColumnName(
- oldColumnName,
- oldSchema,
- partitionKeys,
- // fail the operation of renaming column, once the column derives a computed column
- (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName));
- // validate new column
- if (oldSchema.getColumn(newColumnName).isPresent()) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` already existed in table schema.",
- EX_MSG_PREFIX, newColumnName));
- }
- }
+ private static class ReferencesManager {
+
+ /** Available columns in the table. */
+ private final Set<String> columns;
+
+ /**
+ * Mappings about the column refers which columns, e.g. column `b` refers to the column `a`
+ * in the expression "b as a+1".
+ */
+ private final Map<String, Set<String>> columnToReferences;
+
+ /**
+ * Reverse mappings about the column refers which columns, e.g. column `a` has the
+ * dependency of column `b` in the expression "b as a+1".
+ */
+ private final Map<String, Set<String>> columnToDependencies;
+
+ /** Primary keys defined on the table. */
+ private final Set<String> primaryKeys;
+
+ /** The name of the column watermark expression depends on. */
+ private final Set<String> watermarkReferences;
+
+ /** The name of the column partition keys contains. */
+ private final Set<String> partitionKeys;
+
+ private ReferencesManager(
+ Set<String> columns,
+ Map<String, Set<String>> columnToReferences,
+ Map<String, Set<String>> columnToDependencies,
+ Set<String> primaryKeys,
+ Set<String> watermarkReferences,
+ Set<String> partitionKeys) {
+ this.columns = columns;
+ this.columnToReferences = columnToReferences;
+ this.columnToDependencies = columnToDependencies;
+ this.primaryKeys = primaryKeys;
+ this.watermarkReferences = watermarkReferences;
+ this.partitionKeys = partitionKeys;
+ }
+
+ static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+ Map<String, Set<String>> columnToReferences = new HashMap<>();
+ Map<String, Set<String>> columnToDependencies = new HashMap<>();
+ catalogTable.getResolvedSchema().getColumns().stream()
+ .filter(column -> column instanceof Column.ComputedColumn)
+ .forEach(
+ column -> {
+ Set<String> referencedColumns =
+ ColumnReferenceFinder.findReferencedColumn(
+ column.getName(), catalogTable.getResolvedSchema());
+ for (String referencedColumn : referencedColumns) {
+ columnToReferences
+ .computeIfAbsent(
+ referencedColumn, key -> new HashSet<>())
+ .add(column.getName());
+ columnToDependencies
+ .computeIfAbsent(
+ column.getName(), key -> new HashSet<>())
+ .add(referencedColumn);
+ }
+ });
- private void validateColumnName(
- String columnToDrop,
- ResolvedSchema oldSchema,
- List<String> partitionKeys,
- Set<String> columnsToDrop) {
- validateColumnName(
- columnToDrop,
- oldSchema,
- partitionKeys,
- // fail the operation of dropping column, only if the column derives a computed
- // column, and the computed column is not being dropped along with the old column
- (referencedColumn, computedColumn) ->
- referencedColumn.contains(columnToDrop)
- && !columnsToDrop.contains(computedColumn.getName()));
- oldSchema
- .getPrimaryKey()
- .ifPresent(
- pk -> {
- if (pk.getColumns().contains(columnToDrop)) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` is used as the primary key.",
- EX_MSG_PREFIX, columnToDrop));
- }
- });
- }
+ return new ReferencesManager(
+ new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+ columnToReferences,
+ columnToDependencies,
+ catalogTable
+ .getResolvedSchema()
+ .getPrimaryKey()
+ .map(constraint -> new HashSet<>(constraint.getColumns()))
+ .orElse(new HashSet<>()),
+ ColumnReferenceFinder.findWatermarkReferencedColumn(
+ catalogTable.getResolvedSchema()),
+ new HashSet<>(catalogTable.getPartitionKeys()));
+ }
+
+ void dropColumn(String columnName) {
+ checkReferences(columnName);
+ if (primaryKeys.contains(columnName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column %s is used as the primary key.",
+ EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName)));
+ }
- private void validateColumnName(
- String columnToAlter,
- ResolvedSchema oldSchema,
- List<String> partitionKeys,
- BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
- // validate old column
- Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
- if (!tableColumns.contains(columnToAlter)) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` does not exist in the base table.",
- EX_MSG_PREFIX, columnToAlter));
+ columnToDependencies
+ .getOrDefault(columnName, Collections.emptySet())
+ .forEach(
+ referredColumn ->
+ columnToReferences.get(referredColumn).remove(columnName));
+ columnToDependencies.remove(columnName);
+ columns.remove(columnName);
}
- // validate old column name isn't referred by computed column case
- oldSchema.getColumns().stream()
- .filter(column -> column instanceof Column.ComputedColumn)
- .forEach(
- column -> {
- Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
- Set<String> referencedColumn =
- ColumnReferenceFinder.findReferencedColumn(
- computedColumn.getName(), oldSchema);
- if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` is referenced by computed column %s.",
- EX_MSG_PREFIX,
- columnToAlter,
- computedColumn.asSummaryString()));
- }
- });
- // validate partition keys doesn't contain the old column
- if (partitionKeys.contains(columnToAlter)) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` is used as the partition keys.",
- EX_MSG_PREFIX, columnToAlter));
+ int getColumnDependencyCount(String columnName) {
+ return columnToDependencies.getOrDefault(columnName, Collections.emptySet()).size();
}
- }
- private void validateWatermark(ResolvedCatalogTable oldTable, String columnToAlter) {
- // validate old column isn't referenced by watermark
- List<WatermarkSpec> watermarkSpecs = oldTable.getResolvedSchema().getWatermarkSpecs();
- Set<String> referencedColumns =
- ColumnReferenceFinder.findWatermarkReferencedColumn(oldTable.getResolvedSchema());
- Set<String> rowtimeAttributes =
- oldTable.getResolvedSchema().getWatermarkSpecs().stream()
- .map(WatermarkSpec::getRowtimeAttribute)
- .collect(Collectors.toSet());
- if (rowtimeAttributes.contains(columnToAlter)
- || referencedColumns.contains(columnToAlter)) {
- throw new ValidationException(
- String.format(
- "%sThe column `%s` is referenced by watermark expression %s.",
- EX_MSG_PREFIX, columnToAlter, watermarkSpecs));
+ void checkReferences(String columnName) {
+ if (!columns.contains(columnName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` does not exist in the base table.",
+ EX_MSG_PREFIX, columnName));
+ }
+ if (columnToReferences.containsKey(columnName)
+ && !columnToReferences.get(columnName).isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column %s is referenced by computed column %s.",
+ EX_MSG_PREFIX,
+ EncodingUtils.escapeIdentifier(columnName),
+ columnToReferences.get(columnName).stream()
+ .map(EncodingUtils::escapeIdentifier)
+ .sorted()
+ .collect(Collectors.joining(", "))));
+ }
+ if (partitionKeys.contains(columnName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` is used as the partition keys.",
+ EX_MSG_PREFIX, columnName));
+ }
+ if (watermarkReferences.contains(columnName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` is referenced by watermark expression.",
+ EX_MSG_PREFIX, columnName));
+ }
}
}
+ // --------------------------------------------------------------------------------------------
+
private void buildUpdatedColumn(
Schema.Builder builder,
ResolvedCatalogTable oldTable,
@@ -893,6 +928,22 @@ public class AlterSchemaConverter {
oldColumn.getComment().ifPresent(builder::withComment);
}
+ private Operation buildAlterTableChangeOperation(
+ SqlAlterTable alterTable,
+ List<TableChange> tableChanges,
+ Schema newSchema,
+ ResolvedCatalogTable oldTable) {
+ return new AlterTableChangeOperation(
+ catalogManager.qualifyIdentifier(
+ UnresolvedIdentifier.of(alterTable.fullTableName())),
+ tableChanges,
+ CatalogTable.of(
+ newSchema,
+ oldTable.getComment(),
+ oldTable.getPartitionKeys(),
+ oldTable.getOptions()));
+ }
+
private static String getColumnName(SqlIdentifier identifier) {
if (!identifier.isSimple()) {
throw new UnsupportedOperationException(
@@ -903,11 +954,24 @@ public class AlterSchemaConverter {
return identifier.getSimple();
}
- private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alterTableSchema) {
+ private SchemaConverter createSchemaConverter(
+ SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable oldTable) {
if (alterTableSchema instanceof SqlAlterTableAdd) {
- return AlterSchemaStrategy.ADD;
+ return new AddSchemaConverter(
+ oldTable.getUnresolvedSchema(),
+ (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+ sqlValidator,
+ constraintValidator,
+ escapeExpression,
+ catalogManager.getSchemaResolver());
} else if (alterTableSchema instanceof SqlAlterTableModify) {
- return AlterSchemaStrategy.MODIFY;
+ return new ModifySchemaConverter(
+ oldTable,
+ (FlinkTypeFactory) sqlValidator.getTypeFactory(),
+ sqlValidator,
+ constraintValidator,
+ escapeExpression,
+ catalogManager.getSchemaResolver());
}
throw new UnsupportedOperationException(
String.format(
@@ -918,10 +982,4 @@ public class AlterSchemaConverter {
private <T> T unwrap(Optional<T> value) {
return value.orElseThrow(() -> new TableException("The value should never be empty."));
}
-
- /** A strategy to describe the alter schema kind. */
- private enum AlterSchemaStrategy {
- ADD,
- MODIFY
- }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index bd5aa2ec984..7bab1359df4 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -166,7 +166,6 @@ import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
@@ -255,7 +254,7 @@ public class SqlToOperationConverter {
flinkPlanner.getOrCreateSqlValidator(),
this::validateTableConstraint,
this::getQuotedSqlString,
- catalogManager.getSchemaResolver());
+ catalogManager);
}
/**
@@ -506,44 +505,17 @@ public class SqlToOperationConverter {
return convertAlterTableReset(
tableIdentifier, (CatalogTable) baseTable, (SqlAlterTableReset) sqlAlterTable);
} else if (sqlAlterTable instanceof SqlAlterTableDropColumn) {
- return new AlterTableSchemaOperation(
- tableIdentifier,
- CatalogTable.of(
- alterSchemaConverter.applySchemaChange(
- (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable),
- resolvedCatalogTable.getComment(),
- resolvedCatalogTable.getPartitionKeys(),
- resolvedCatalogTable.getOptions()));
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable);
} else if (sqlAlterTable instanceof SqlAlterTableDropPrimaryKey) {
- return new AlterTableSchemaOperation(
- tableIdentifier,
- CatalogTable.of(
- alterSchemaConverter.applySchemaChange(
- (SqlAlterTableDropPrimaryKey) sqlAlterTable,
- resolvedCatalogTable),
- resolvedCatalogTable.getComment(),
- resolvedCatalogTable.getPartitionKeys(),
- resolvedCatalogTable.getOptions()));
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableDropPrimaryKey) sqlAlterTable, resolvedCatalogTable);
} else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
- return new AlterTableSchemaOperation(
- tableIdentifier,
- CatalogTable.of(
- alterSchemaConverter.applySchemaChange(
- (SqlAlterTableDropConstraint) sqlAlterTable,
- resolvedCatalogTable),
- resolvedCatalogTable.getComment(),
- resolvedCatalogTable.getPartitionKeys(),
- resolvedCatalogTable.getOptions()));
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableDropConstraint) sqlAlterTable, resolvedCatalogTable);
} else if (sqlAlterTable instanceof SqlAlterTableDropWatermark) {
- return new AlterTableSchemaOperation(
- tableIdentifier,
- CatalogTable.of(
- alterSchemaConverter.applySchemaChange(
- (SqlAlterTableDropWatermark) sqlAlterTable,
- resolvedCatalogTable),
- resolvedCatalogTable.getComment(),
- resolvedCatalogTable.getPartitionKeys(),
- resolvedCatalogTable.getOptions()));
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableDropWatermark) sqlAlterTable, resolvedCatalogTable);
} else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
return OperationConverterUtils.convertAddReplaceColumns(
tableIdentifier,
@@ -554,24 +526,11 @@ public class SqlToOperationConverter {
return OperationConverterUtils.convertChangeColumn(
tableIdentifier,
(SqlChangeColumn) sqlAlterTable,
- (ResolvedCatalogTable) baseTable,
+ resolvedCatalogTable,
flinkPlanner.getOrCreateSqlValidator());
} else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
- SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
- (SqlAlterTableRenameColumn) sqlAlterTable;
- ResolvedCatalogTable baseCatalogTable = (ResolvedCatalogTable) baseTable;
- List<TableChange> tableChanges = new ArrayList<>();
- Schema newSchema =
- alterSchemaConverter.applySchemaChange(
- sqlAlterTableRenameColumn, baseCatalogTable, tableChanges);
- return new AlterTableChangeOperation(
- tableIdentifier,
- tableChanges,
- CatalogTable.of(
- newSchema,
- resolvedCatalogTable.getComment(),
- resolvedCatalogTable.getPartitionKeys(),
- resolvedCatalogTable.getOptions()));
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableRenameColumn) sqlAlterTable, resolvedCatalogTable);
} else if (sqlAlterTable instanceof SqlAddPartitions) {
List<CatalogPartitionSpec> specs = new ArrayList<>();
List<CatalogPartition> partitions = new ArrayList<>();
@@ -598,10 +557,8 @@ public class SqlToOperationConverter {
optionalCatalogTable.get(),
(SqlAlterTableCompact) sqlAlterTable);
} else if (sqlAlterTable instanceof SqlAlterTableSchema) {
- return convertAlterTableSchema(
- tableIdentifier,
- optionalCatalogTable.get().getResolvedTable(),
- (SqlAlterTableSchema) sqlAlterTable);
+ return alterSchemaConverter.convertAlterSchema(
+ (SqlAlterTableSchema) sqlAlterTable, resolvedCatalogTable);
} else {
throw new ValidationException(
String.format(
@@ -724,23 +681,6 @@ public class SqlToOperationConverter {
tableIdentifier));
}
- private Operation convertAlterTableSchema(
- ObjectIdentifier tableIdentifier,
- ResolvedCatalogTable oldTable,
- SqlAlterTableSchema alterTableSchema) {
- List<TableChange> tableChanges = new ArrayList<>();
- Schema newSchema =
- alterSchemaConverter.applySchemaChange(alterTableSchema, oldTable, tableChanges);
- return new AlterTableChangeOperation(
- tableIdentifier,
- tableChanges,
- CatalogTable.of(
- newSchema,
- oldTable.getComment(),
- oldTable.getPartitionKeys(),
- oldTable.getOptions()));
- }
-
/** Convert CREATE FUNCTION statement. */
private Operation convertCreateFunction(SqlCreateFunction sqlCreateFunction) {
UnresolvedIdentifier unresolvedIdentifier =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 7587200850c..7e0079b1568 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -100,11 +100,9 @@ public class OperationConverterUtils {
setWatermarkAndPK(builder, catalogTable.getSchema());
}
List<TableChange> tableChanges = new ArrayList<>();
- // TODO: support TableChange in FLINK-30497
for (SqlNode sqlNode : addReplaceColumns.getNewColumns()) {
TableColumn tableColumn = toTableColumn((SqlTableColumn) sqlNode, sqlValidator);
builder.add(tableColumn);
- // TODO: support ALTER TABLE REPLACE with TableChange in FLINK-30497
if (!addReplaceColumns.isReplace()) {
tableChanges.add(
TableChange.add(
@@ -133,6 +131,14 @@ public class OperationConverterUtils {
newProperties,
catalogTable.getComment());
if (addReplaceColumns.isReplace()) {
+ // It's hard to determine how to decompose the ALTER TABLE REPLACE into multiple
+ // TableChanges. For example, with old schema <a INT, b INT, c INT> and the new schema
+ // <a INT, d INT>, there are multiple alternatives:
+ // plan 1: DROP COLUMN c, RENAME COLUMN b TO d;
+ // plan 2: DROP COLUMN b, RENAME COLUMN c TO d;
+ // So we don't translate with TableChanges here. One workaround is
+ // to minimize the edit distance, i.e., minimize the modification times, but it
+ // still cannot provide a deterministic answer.
return new AlterTableSchemaOperation(tableIdentifier, newTable);
} else {
return new AlterTableChangeOperation(tableIdentifier, tableChanges, newTable);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
index 0444afea947..6e406315e12 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
@@ -55,8 +55,9 @@ public class ColumnReferenceFinderTest extends TableTestBase {
"tuple",
DataTypes.ROW(
DataTypes.TIMESTAMP(3), DataTypes.INT()))
+ .column("g", DataTypes.TIMESTAMP(3))
.columnByExpression("ts", "tuple.f0")
- .watermark("ts", "ts - interval '5' day")
+ .watermark("ts", "g - interval '5' day")
.build());
}
@@ -75,6 +76,6 @@ public class ColumnReferenceFinderTest extends TableTestBase {
.containsExactlyInAnyOrder("tuple");
assertThat(ColumnReferenceFinder.findWatermarkReferencedColumn(resolvedSchema))
- .containsExactlyInAnyOrder("ts");
+ .containsExactlyInAnyOrder("ts", "g");
}
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 10a7af4f4f6..253c147effa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -82,7 +82,6 @@ import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
@@ -1332,14 +1331,12 @@ public class SqlToOperationConverterTest {
// rename column c that is used in a computed column
assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+ .hasMessageContaining("The column `a` is referenced by computed column `d`.");
// rename column used in the watermark expression
assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+ .hasMessageContaining("The column `ts` is referenced by watermark expression.");
// rename nested column
assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
@@ -1374,7 +1371,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `j` STRING AS upper(e).");
+ "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `g`, `j`.");
// rename column used as partition key
assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
@@ -1409,8 +1406,7 @@ public class SqlToOperationConverterTest {
// drop a column which generates a computed column
assertThatThrownBy(() -> parse("alter table tb1 drop a"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+ .hasMessageContaining("The column `a` is referenced by computed column `d`.");
// drop a column which is pk
assertThatThrownBy(() -> parse("alter table tb1 drop c"))
@@ -1420,8 +1416,7 @@ public class SqlToOperationConverterTest {
// drop a column which defines watermark
assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+ .hasMessageContaining("The column `ts` is referenced by watermark expression.");
}
@Test
@@ -1429,30 +1424,31 @@ public class SqlToOperationConverterTest {
prepareNonManagedTable(false);
// drop a single column
Operation operation = parse("alter table tb1 drop c");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
- assertThat(operation.asSummaryString())
- .isEqualTo(
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT NOT NULL,\n"
- + " `d` AS [a*(b+2 + a*b)],\n"
- + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
- + " `f` AS [e.f1 + e.f2.f0],\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
- + ")");
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+ assertThat(operation.asSummaryString()).isEqualTo("ALTER TABLE cat1.db1.tb1\n DROP `c`");
+ assertThat(
+ ((AlterTableChangeOperation) operation)
+ .getNewTable().getUnresolvedSchema().getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toList()))
+ .doesNotContain("c");
// drop computed column and referenced columns together
operation = parse("alter table tb1 drop (f, e, b, d)");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString())
.isEqualTo(
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `c` STRING NOT NULL COMMENT 'column comment',\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
- + ")");
+ "ALTER TABLE cat1.db1.tb1\n"
+ + " DROP `d`,\n"
+ + " DROP `f`,\n"
+ + " DROP `b`,\n"
+ + " DROP `e`");
+ assertThat(
+ ((AlterTableChangeOperation) operation)
+ .getNewTable().getUnresolvedSchema().getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toList()))
+ .doesNotContain("f", "e", "b", "d");
}
@Test
@@ -1474,25 +1470,27 @@ public class SqlToOperationConverterTest {
@Test
public void testAlterTableDropConstraint() throws Exception {
prepareNonManagedTable(true);
- String expectedSummaryString =
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT NOT NULL,\n"
- + " `c` STRING NOT NULL COMMENT 'column comment',\n"
- + " `d` AS [a*(b+2 + a*b)],\n"
- + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
- + " `f` AS [e.f1 + e.f2.f0],\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
- + ")";
+ String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n DROP CONSTRAINT ct1";
Operation operation = parse("alter table tb1 drop constraint ct1");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+ assertThat(
+ ((AlterTableChangeOperation) operation)
+ .getNewTable()
+ .getUnresolvedSchema()
+ .getPrimaryKey())
+ .isNotPresent();
operation = parse("alter table tb1 drop primary key");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+ assertThat(
+ ((AlterTableChangeOperation) operation)
+ .getNewTable()
+ .getUnresolvedSchema()
+ .getPrimaryKey())
+ .isNotPresent();
}
@Test
@@ -1507,19 +1505,15 @@ public class SqlToOperationConverterTest {
public void testAlterTableDropWatermark() throws Exception {
prepareNonManagedTable("tb1", true);
Operation operation = parse("alter table tb1 drop watermark");
- assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
assertThat(operation.asSummaryString())
- .isEqualTo(
- "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
- + " `a` INT NOT NULL,\n"
- + " `b` BIGINT NOT NULL,\n"
- + " `c` STRING NOT NULL COMMENT 'column comment',\n"
- + " `d` AS [a*(b+2 + a*b)],\n"
- + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
- + " `f` AS [e.f1 + e.f2.f0],\n"
- + " `g` METADATA VIRTUAL,\n"
- + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
- + ")");
+ .isEqualTo("ALTER TABLE cat1.db1.tb1\n DROP WATERMARK");
+ assertThat(
+ ((AlterTableChangeOperation) operation)
+ .getNewTable()
+ .getUnresolvedSchema()
+ .getWatermarkSpecs())
+ .isEmpty();
}
@Test