You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/02 15:53:43 UTC
[flink] 02/02: [FLINK-22137][table] Support DROP column/constraint/watermark for ALTER TABLE statement
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc9540cacc2d775cc190ee6534794ec33fd07828
Author: Jane Chan <qi...@gmail.com>
AuthorDate: Thu Dec 29 14:34:32 2022 +0800
[FLINK-22137][table] Support DROP column/constraint/watermark for ALTER TABLE statement
This closes #21571
---
.../src/test/resources/sql/table.q | 120 +++++--
.../table/api/internal/TableEnvironmentImpl.java | 22 --
.../ddl/AlterTableDropConstraintOperation.java | 43 ---
.../planner/expressions/ColumnReferenceFinder.java | 86 +++--
.../planner/operations/AlterSchemaConverter.java | 381 +++++++++++++++------
.../operations/SqlToOperationConverter.java | 69 ++--
.../expressions/ColumnReferenceFinderTest.java | 80 +++++
.../operations/SqlToOperationConverterTest.java | 337 ++++++++++--------
.../flink/table/api/TableEnvironmentTest.scala | 100 ++++++
9 files changed, 856 insertions(+), 382 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 36809085117..1d1764ac5e0 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -537,45 +537,109 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
!ok
+# ==========================================================================
+# test alter table drop column
+# ==========================================================================
+
+alter table orders2 drop (amount1, product, cleaned_product);
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `trade_order_id` BIGINT NOT NULL,
+ `ts` TIMESTAMP(3) NOT NULL,
+ `user` VARCHAR(2147483647),
+ `user_email` VARCHAR(2147483647) NOT NULL,
+ `product_id` BIGINT NOT NULL,
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
+ CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
+) WITH (
+ 'connector' = 'datagen'
+)
+
+!ok
+
+# ==========================================================================
+# test alter table drop primary key
+# ==========================================================================
+
+alter table orders2 drop primary key;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `trade_order_id` BIGINT NOT NULL,
+ `ts` TIMESTAMP(3) NOT NULL,
+ `user` VARCHAR(2147483647),
+ `user_email` VARCHAR(2147483647) NOT NULL,
+ `product_id` BIGINT NOT NULL,
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE
+) WITH (
+ 'connector' = 'datagen'
+)
+
+!ok
+# ==========================================================================
+# test alter table drop watermark
+# ==========================================================================
+
+alter table orders2 drop watermark;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `trade_order_id` BIGINT NOT NULL,
+ `ts` TIMESTAMP(3) NOT NULL,
+ `user` VARCHAR(2147483647),
+ `user_email` VARCHAR(2147483647) NOT NULL,
+ `product_id` BIGINT NOT NULL,
+ `ptime` AS PROCTIME()
+) WITH (
+ 'connector' = 'datagen'
+)
+
+!ok
# ==========================================================================
# test describe table
# ==========================================================================
describe orders2;
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-| name | type | null | key | extras | watermark |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-| trade_order_id | BIGINT | FALSE | PRI(trade_order_id) | | |
-| ts | TIMESTAMP(3) *ROWTIME* | FALSE | | | `ts` - INTERVAL '1' MINUTE |
-| user | STRING | TRUE | | | |
-| user_email | STRING | FALSE | | | |
-| product_id | BIGINT | FALSE | | | |
-| product | VARCHAR(32) | TRUE | | | |
-| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | |
-| amount1 | INT | TRUE | | | |
-| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-9 rows in set
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| name | type | null | key | extras | watermark |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| trade_order_id | BIGINT | FALSE | | | |
+| ts | TIMESTAMP(3) | FALSE | | | |
+| user | STRING | TRUE | | | |
+| user_email | STRING | FALSE | | | |
+| product_id | BIGINT | FALSE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+6 rows in set
!ok
# test desc table
desc orders2;
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-| name | type | null | key | extras | watermark |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-| trade_order_id | BIGINT | FALSE | PRI(trade_order_id) | | |
-| ts | TIMESTAMP(3) *ROWTIME* | FALSE | | | `ts` - INTERVAL '1' MINUTE |
-| user | STRING | TRUE | | | |
-| user_email | STRING | FALSE | | | |
-| product_id | BIGINT | FALSE | | | |
-| product | VARCHAR(32) | TRUE | | | |
-| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | |
-| amount1 | INT | TRUE | | | |
-| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
-+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
-9 rows in set
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| name | type | null | key | extras | watermark |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+| trade_order_id | BIGINT | FALSE | | | |
+| ts | TIMESTAMP(3) | FALSE | | | |
+| user | STRING | TRUE | | | |
+| user_email | STRING | FALSE | | | |
+| product_id | BIGINT | FALSE | | | |
+| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
++----------------+-----------------------------+-------+-----+---------------+-----------+
+6 rows in set
!ok
# ==========================================================================
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index bffbef65c1c..4a1d000cc5e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -47,8 +47,6 @@ import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -125,7 +123,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -160,7 +157,6 @@ import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
@@ -993,24 +989,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
alterTablePropertiesOp.getCatalogTable(),
alterTablePropertiesOp.getTableIdentifier(),
false);
- } else if (alterTableOperation instanceof AlterTableDropConstraintOperation) {
- AlterTableDropConstraintOperation dropConstraintOperation =
- (AlterTableDropConstraintOperation) operation;
- CatalogTable oriTable =
- catalogManager
- .getTable(dropConstraintOperation.getTableIdentifier())
- .get()
- .getResolvedTable();
- CatalogTable newTable =
- new CatalogTableImpl(
- TableSchemaUtils.dropConstraint(
- oriTable.getSchema(),
- dropConstraintOperation.getConstraintName()),
- oriTable.getPartitionKeys(),
- oriTable.getOptions(),
- oriTable.getComment());
- catalogManager.alterTable(
- newTable, dropConstraintOperation.getTableIdentifier(), false);
} else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
AlterPartitionPropertiesOperation alterPartPropsOp =
(AlterPartitionPropertiesOperation) operation;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
deleted file mode 100644
index b3f7c7d448f..00000000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations.ddl;
-
-import org.apache.flink.table.catalog.ObjectIdentifier;
-
-/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */
-public class AlterTableDropConstraintOperation extends AlterTableOperation {
- private final String constraintName;
-
- public AlterTableDropConstraintOperation(
- ObjectIdentifier tableIdentifier, String constraintName) {
- super(tableIdentifier);
- this.constraintName = constraintName;
- }
-
- public String getConstraintName() {
- return constraintName;
- }
-
- @Override
- public String asSummaryString() {
- return String.format(
- "ALTER TABLE %s DROP CONSTRAINT %s",
- tableIdentifier.asSummaryString(), constraintName);
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
index 7b22246de8d..d3593cb4d85 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -19,6 +19,9 @@
package org.apache.flink.table.planner.expressions;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
@@ -29,33 +32,68 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.calcite.rex.RexInputRef;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
public class ColumnReferenceFinder {
private ColumnReferenceFinder() {}
- public static Set<String> findReferencedColumn(
- ResolvedExpression resolvedExpression, List<String> tableColumns) {
- ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns);
- visitor.visit(resolvedExpression);
- return visitor.referencedColumns;
+ /**
+ * Find referenced column names that derive the computed column.
+ *
+ * @param columnName the name of the column
+ * @param schema the schema contains the computed column definition
+ * @return the referenced column names
+ */
+ public static Set<String> findReferencedColumn(String columnName, ResolvedSchema schema) {
+ Column column =
+ schema.getColumn(columnName)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "The input column %s doesn't exist in the schema.",
+ columnName)));
+ if (!(column instanceof Column.ComputedColumn)) {
+ return Collections.emptySet();
+ }
+ ColumnReferenceVisitor visitor =
+ new ColumnReferenceVisitor(
+ // the input ref index is based on a projection of non-computed columns
+ schema.getColumns().stream()
+ .filter(c -> !(c instanceof Column.ComputedColumn))
+ .map(Column::getName)
+ .collect(Collectors.toList()));
+ return visitor.visit(((Column.ComputedColumn) column).getExpression());
+ }
+
+ /**
+ * Find referenced column names that derive the watermark expression.
+ *
+ * @param schema resolved columns contains the watermark expression.
+ * @return the referenced column names
+ */
+ public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) {
+ ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames());
+ return schema.getWatermarkSpecs().stream()
+ .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream())
+ .collect(Collectors.toSet());
}
- private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+ private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Set<String>> {
private final List<String> tableColumns;
- private final Set<String> referencedColumns;
public ColumnReferenceVisitor(List<String> tableColumns) {
this.tableColumns = tableColumns;
- this.referencedColumns = new HashSet<>();
}
@Override
- public Void visit(Expression expression) {
+ public Set<String> visit(Expression expression) {
if (expression instanceof LocalReferenceExpression) {
return visit((LocalReferenceExpression) expression);
} else if (expression instanceof FieldReferenceExpression) {
@@ -70,38 +108,34 @@ public class ColumnReferenceFinder {
}
@Override
- public Void visit(FieldReferenceExpression fieldReference) {
- referencedColumns.add(fieldReference.getName());
- return null;
+ public Set<String> visit(FieldReferenceExpression fieldReference) {
+ return Collections.singleton(fieldReference.getName());
}
- public Void visit(LocalReferenceExpression localReference) {
- referencedColumns.add(localReference.getName());
- return null;
+ public Set<String> visit(LocalReferenceExpression localReference) {
+ return Collections.singleton(localReference.getName());
}
- public Void visit(RexNodeExpression rexNode) {
+ public Set<String> visit(RexNodeExpression rexNode) {
// get the referenced column ref in table
Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode());
// get the referenced column name by index
- inputRefs.forEach(
- inputRef -> {
- int index = inputRef.getIndex();
- referencedColumns.add(tableColumns.get(index));
- });
- return null;
+ return inputRefs.stream()
+ .map(inputRef -> tableColumns.get(inputRef.getIndex()))
+ .collect(Collectors.toSet());
}
@Override
- public Void visit(CallExpression call) {
+ public Set<String> visit(CallExpression call) {
+ Set<String> references = new HashSet<>();
for (Expression expression : call.getChildren()) {
- visit(expression);
+ references.addAll(visit(expression));
}
- return null;
+ return references;
}
@Override
- protected Void defaultMethod(Expression expression) {
+ protected Set<String> defaultMethod(Expression expression) {
throw new TableException("Unexpected expression: " + expression);
}
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index b00038ce20f..b0b5e65fdf1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -19,6 +19,10 @@
package org.apache.flink.table.planner.operations;
import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark;
import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
@@ -29,9 +33,8 @@ import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.TableChange;
@@ -62,6 +65,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -93,6 +98,10 @@ public class AlterSchemaConverter {
this.schemaResolver = schemaResolver;
}
+ /**
+ * Convert ALTER TABLE ADD | MODIFY (<schema_component> [, <schema_component>, ...])
+ * to generate an updated Schema.
+ */
public Schema applySchemaChange(
SqlAlterTableSchema alterTableSchema,
Schema originSchema,
@@ -101,7 +110,7 @@ public class AlterSchemaConverter {
SchemaConverter converter =
strategy == AlterSchemaStrategy.ADD
? new AddSchemaConverter(
- originalSchema,
+ originSchema,
(FlinkTypeFactory) sqlValidator.getTypeFactory(),
sqlValidator,
constraintValidator,
@@ -109,7 +118,7 @@ public class AlterSchemaConverter {
schemaResolver,
tableChangeCollector)
: new ModifySchemaConverter(
- originalSchema,
+ originSchema,
(FlinkTypeFactory) sqlValidator.getTypeFactory(),
sqlValidator,
constraintValidator,
@@ -122,78 +131,145 @@ public class AlterSchemaConverter {
return converter.convert();
}
+ /** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */
public Schema applySchemaChange(
- SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
- String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+ SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable originTable) {
+ String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
- List<String> tableColumns =
- originalTable.getResolvedSchema().getColumns().stream()
- .map(Column::getName)
- .collect(Collectors.toList());
- // validate old column is exists or new column isn't duplicated or old column isn't
- // referenced by computed column
+ // validate origin column is exists, new column name does not collide with existed column
+ // names, and origin column isn't referenced by computed column
validateColumnName(
- oldColumnName,
+ originColumnName,
newColumnName,
- tableColumns,
- originalTable.getResolvedSchema(),
- ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
-
- // validate old column isn't referenced by watermark
- List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
- watermarkSpecs.forEach(
- watermarkSpec -> {
- String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
- Set<String> referencedColumns =
- ColumnReferenceFinder.findReferencedColumn(
- watermarkSpec.getWatermarkExpression(), tableColumns);
- if (oldColumnName.equals(rowtimeAttribute)
- || referencedColumns.contains(oldColumnName)) {
- throw new ValidationException(
- String.format(
- "Old column %s is referred by watermark expression %s, "
- + "currently doesn't allow to rename column which is "
- + "referred by watermark expression.",
- oldColumnName, watermarkSpec.asSummaryString()));
+ originTable.getResolvedSchema(),
+ originTable.getPartitionKeys());
+ validateWatermark(originTable, originColumnName);
+
+ // generate new schema
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> {
+ if (column.getName().equals(originColumnName)) {
+ buildNewColumnFromOriginColumn(builder, column, newColumnName);
+ } else {
+ builder.fromColumns(Collections.singletonList(column));
}
});
+ buildUpdatedPrimaryKey(
+ schemaBuilder,
+ originTable,
+ (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
- Schema.Builder builder = Schema.newBuilder();
- // build column
- Schema originSchema = originalTable.getTable().getUnresolvedSchema();
- originSchema
- .getColumns()
+ /** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */
+ public Schema applySchemaChange(
+ SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable originTable) {
+ Set<String> columnsToDrop = new HashSet<>();
+ dropColumn
+ .getColumnList()
.forEach(
- column -> {
- if (oldColumnName.equals(column.getName())) {
- buildNewColumnFromOriginColumn(builder, column, newColumnName);
- } else {
- builder.fromColumns(Collections.singletonList(column));
+ identifier -> {
+ String name = getColumnName((SqlIdentifier) identifier);
+ if (!columnsToDrop.add(name)) {
+ throw new ValidationException(
+ String.format(
+ "%sDuplicate column `%s`.", EX_MSG_PREFIX, name));
}
});
- // build primary key
- Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
- if (originPrimaryKey.isPresent()) {
- List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
- String constrainName = originPrimaryKey.get().getConstraintName();
- List<String> newPrimaryKeyNames =
- originPrimaryKeyNames.stream()
- .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
- .collect(Collectors.toList());
- builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
- }
-
- // build watermark
- originSchema
- .getWatermarkSpecs()
- .forEach(
- watermarkSpec ->
- builder.watermark(
- watermarkSpec.getColumnName(),
- watermarkSpec.getWatermarkExpression()));
- // generate new schema
- return builder.build();
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+ String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier);
+ // validate the column to drop exists in the table schema, is not a primary key and
+ // does not derive any computed column
+ validateColumnName(
+ columnToDrop,
+ originTable.getResolvedSchema(),
+ originTable.getPartitionKeys(),
+ columnsToDrop);
+ validateWatermark(originTable, columnToDrop);
+ }
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> {
+ if (!columnsToDrop.contains(column.getName())) {
+ builder.fromColumns(Collections.singletonList(column));
+ }
+ });
+ buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
+
+ /** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */
+ public Schema applySchemaChange(
+ SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable originTable) {
+ Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+ if (!pkConstraint.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+ }
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
+
+ /**
+ * Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}.
+ */
+ public Schema applySchemaChange(
+ SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable originTable) {
+ Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey();
+ if (!pkConstraint.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define any primary key.", EX_MSG_PREFIX));
+ }
+ SqlIdentifier constraintIdentifier = dropConstraint.getConstraintName();
+ String constraintName = pkConstraint.get().getName();
+ if (constraintIdentifier != null
+ && !constraintIdentifier.getSimple().equals(constraintName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define a primary key constraint named '%s'. "
+ + "Available constraint name: ['%s'].",
+ EX_MSG_PREFIX, constraintIdentifier.getSimple(), constraintName));
+ }
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
+
+ /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */
+ public Schema applySchemaChange(
+ SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable originTable) {
+ if (originTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define any watermark strategy.",
+ EX_MSG_PREFIX));
+ }
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> builder.fromColumns(Collections.singletonList(column)));
+ buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity());
+ return schemaBuilder.build();
}
// --------------------------------------------------------------------------------------------
@@ -216,7 +292,7 @@ public class AlterSchemaConverter {
List<Function<ResolvedSchema, TableChange>> changeBuilders = new ArrayList<>();
SchemaConverter(
- Schema originalSchema,
+ Schema originSchema,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -229,13 +305,13 @@ public class AlterSchemaConverter {
this.escapeExpressions = escapeExpressions;
this.schemaResolver = schemaResolver;
this.changesCollector = changesCollector;
- populateColumnsFromSourceTable(originalSchema);
- populatePrimaryKeyFromSourceTable(originalSchema);
- populateWatermarkFromSourceTable(originalSchema);
+ populateColumnsFromSourceTable(originSchema);
+ populatePrimaryKeyFromSourceTable(originSchema);
+ populateWatermarkFromSourceTable(originSchema);
}
- private void populateColumnsFromSourceTable(Schema originalSchema) {
- originalSchema
+ private void populateColumnsFromSourceTable(Schema originSchema) {
+ originSchema
.getColumns()
.forEach(
column -> {
@@ -245,15 +321,15 @@ public class AlterSchemaConverter {
});
}
- private void populatePrimaryKeyFromSourceTable(Schema originalSchema) {
- if (originalSchema.getPrimaryKey().isPresent()) {
- primaryKey = originalSchema.getPrimaryKey().get();
+ private void populatePrimaryKeyFromSourceTable(Schema originSchema) {
+ if (originSchema.getPrimaryKey().isPresent()) {
+ primaryKey = originSchema.getPrimaryKey().get();
}
}
- private void populateWatermarkFromSourceTable(Schema originalSchema) {
+ private void populateWatermarkFromSourceTable(Schema originSchema) {
for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec :
- originalSchema.getWatermarkSpecs()) {
+ originSchema.getWatermarkSpecs()) {
watermarkSpec = sourceWatermarkSpec;
}
}
@@ -303,13 +379,13 @@ public class AlterSchemaConverter {
private void updatePrimaryKeyNullability(String columnName) {
Schema.UnresolvedColumn column = columns.get(columnName);
if (column instanceof Schema.UnresolvedPhysicalColumn) {
- AbstractDataType<?> originalType =
+ AbstractDataType<?> originType =
((Schema.UnresolvedPhysicalColumn) column).getDataType();
columns.put(
columnName,
new Schema.UnresolvedPhysicalColumn(
columnName,
- originalType.notNull(),
+ originType.notNull(),
column.getComment().orElse(null)));
}
}
@@ -455,7 +531,7 @@ public class AlterSchemaConverter {
private static class AddSchemaConverter extends SchemaConverter {
AddSchemaConverter(
- Schema originalSchema,
+ Schema originSchema,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -463,7 +539,7 @@ public class AlterSchemaConverter {
SchemaResolver schemaResolver,
List<TableChange> changeCollector) {
super(
- originalSchema,
+ originSchema,
typeFactory,
sqlValidator,
constraintValidator,
@@ -536,7 +612,7 @@ public class AlterSchemaConverter {
private static class ModifySchemaConverter extends SchemaConverter {
ModifySchemaConverter(
- Schema originalSchema,
+ Schema originSchema,
FlinkTypeFactory typeFactory,
SqlValidator sqlValidator,
Consumer<SqlTableConstraint> constraintValidator,
@@ -544,7 +620,7 @@ public class AlterSchemaConverter {
SchemaResolver schemaResolver,
List<TableChange> tableChangeCollector) {
super(
- originalSchema,
+ originSchema,
typeFactory,
sqlValidator,
constraintValidator,
@@ -611,52 +687,151 @@ public class AlterSchemaConverter {
private void validateColumnName(
String originColumnName,
String newColumnName,
- List<String> tableColumns,
- ResolvedSchema originResolvedSchema,
+ ResolvedSchema originSchemas,
List<String> partitionKeys) {
- // validate old column
- if (!tableColumns.contains(originColumnName)) {
+ validateColumnName(
+ originColumnName,
+ originSchemas,
+ partitionKeys,
+ // fail the operation of renaming column, once the column derives a computed column
+ (referencedColumn, computedColumn) -> referencedColumn.contains(originColumnName));
+ // validate new column
+ if (originSchemas.getColumn(newColumnName).isPresent()) {
throw new ValidationException(
String.format(
- "Old column %s not found in table schema for RENAME COLUMN",
- originColumnName));
+ "%sThe column `%s` already existed in table schema.",
+ EX_MSG_PREFIX, newColumnName));
}
+ }
- // validate new column
- if (tableColumns.contains(newColumnName)) {
+ private void validateColumnName(
+ String columnToDrop,
+ ResolvedSchema originSchema,
+ List<String> partitionKeys,
+ Set<String> columnsToDrop) {
+ validateColumnName(
+ columnToDrop,
+ originSchema,
+ partitionKeys,
+ // fail the operation of dropping column, only if the column derives a computed
+ // column, and the computed column is not being dropped along with the origin column
+ (referencedColumn, computedColumn) ->
+ referencedColumn.contains(columnToDrop)
+ && !columnsToDrop.contains(computedColumn.getName()));
+ originSchema
+ .getPrimaryKey()
+ .ifPresent(
+ pk -> {
+ if (pk.getColumns().contains(columnToDrop)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` is used as the primary key.",
+ EX_MSG_PREFIX, columnToDrop));
+ }
+ });
+ }
+
+ private void validateColumnName(
+ String columnToAlter,
+ ResolvedSchema originSchema,
+ List<String> partitionKeys,
+ BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) {
+ // validate origin column
+ Set<String> tableColumns = new HashSet<>(originSchema.getColumnNames());
+ if (!tableColumns.contains(columnToAlter)) {
throw new ValidationException(
String.format(
- "New column %s already existed in table schema for RENAME COLUMN",
- newColumnName));
+ "%sThe column `%s` does not exist in the base table.",
+ EX_MSG_PREFIX, columnToAlter));
}
- // validate old column name isn't referred by computed column case
- originResolvedSchema.getColumns().stream()
+ // validate origin column name isn't referred by computed column case
+ originSchema.getColumns().stream()
.filter(column -> column instanceof Column.ComputedColumn)
.forEach(
column -> {
Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
Set<String> referencedColumn =
ColumnReferenceFinder.findReferencedColumn(
- computedColumn.getExpression(), tableColumns);
- if (referencedColumn.contains(originColumnName)) {
+ computedColumn.getName(), originSchema);
+ if (computedColumnChecker.apply(referencedColumn, computedColumn)) {
throw new ValidationException(
String.format(
- "Old column %s is referred by computed column %s, currently doesn't "
- + "allow to rename column which is referred by computed column.",
- originColumnName,
+ "%sThe column `%s` is referenced by computed column %s.",
+ EX_MSG_PREFIX,
+ columnToAlter,
computedColumn.asSummaryString()));
}
});
- // validate partition keys doesn't contain the old column
- if (partitionKeys.contains(originColumnName)) {
+ // validate partition keys doesn't contain the origin column
+ if (partitionKeys.contains(columnToAlter)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe column `%s` is used as the partition keys.",
+ EX_MSG_PREFIX, columnToAlter));
+ }
+ }
+
+ private void validateWatermark(ResolvedCatalogTable originTable, String columnToAlter) {
+ // validate origin column isn't referenced by watermark
+ List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs();
+ Set<String> referencedColumns =
+ ColumnReferenceFinder.findWatermarkReferencedColumn(
+ originTable.getResolvedSchema());
+ Set<String> rowtimeAttributes =
+ originTable.getResolvedSchema().getWatermarkSpecs().stream()
+ .map(WatermarkSpec::getRowtimeAttribute)
+ .collect(Collectors.toSet());
+ if (rowtimeAttributes.contains(columnToAlter)
+ || referencedColumns.contains(columnToAlter)) {
throw new ValidationException(
String.format(
- "Can not rename column %s because it is used as the partition keys.",
- originColumnName));
+ "%sThe column `%s` is referenced by watermark expression %s.",
+ EX_MSG_PREFIX, columnToAlter, watermarkSpecs));
}
}
+ private void buildUpdatedColumn(
+ Schema.Builder builder,
+ ResolvedCatalogTable originTable,
+ BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) {
+ // build column
+ originTable
+ .getUnresolvedSchema()
+ .getColumns()
+ .forEach(column -> columnConsumer.accept(builder, column));
+ }
+
+ private void buildUpdatedPrimaryKey(
+ Schema.Builder builder,
+ ResolvedCatalogTable originTable,
+ Function<String, String> columnRenamer) {
+ originTable
+ .getUnresolvedSchema()
+ .getPrimaryKey()
+ .ifPresent(
+ pk -> {
+ List<String> originPrimaryKeyNames = pk.getColumnNames();
+ String constrainName = pk.getConstraintName();
+ List<String> newPrimaryKeyNames =
+ originPrimaryKeyNames.stream()
+ .map(columnRenamer)
+ .collect(Collectors.toList());
+ builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+ });
+ }
+
+ private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable originTable) {
+ originTable
+ .getUnresolvedSchema()
+ .getWatermarkSpecs()
+ .forEach(
+ watermarkSpec ->
+ builder.watermark(
+ watermarkSpec.getColumnName(),
+ watermarkSpec.getWatermarkExpression()));
+ }
+
private void buildNewColumnFromOriginColumn(
Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
if (originColumn instanceof Schema.UnresolvedComputedColumn) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 3ae34126d33..acb46f60fc9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -25,7 +25,10 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase;
import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark;
import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
@@ -88,7 +91,6 @@ import org.apache.flink.sql.parser.dql.SqlUnloadModule;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -163,7 +165,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.AlterViewAsOperation;
@@ -488,6 +489,7 @@ public class SqlToOperationConverter {
if (baseTable instanceof CatalogView) {
throw new ValidationException("ALTER TABLE for a view is not allowed");
}
+ ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) baseTable;
if (sqlAlterTable instanceof SqlAlterTableRename) {
UnresolvedIdentifier newUnresolvedIdentifier =
UnresolvedIdentifier.of(
@@ -503,23 +505,45 @@ public class SqlToOperationConverter {
} else if (sqlAlterTable instanceof SqlAlterTableReset) {
return convertAlterTableReset(
tableIdentifier, (CatalogTable) baseTable, (SqlAlterTableReset) sqlAlterTable);
+ } else if (sqlAlterTable instanceof SqlAlterTableDropColumn) {
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ alterSchemaConverter.applySchemaChange(
+ (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable),
+ resolvedCatalogTable.getComment(),
+ resolvedCatalogTable.getPartitionKeys(),
+ resolvedCatalogTable.getOptions()));
+ } else if (sqlAlterTable instanceof SqlAlterTableDropPrimaryKey) {
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ alterSchemaConverter.applySchemaChange(
+ (SqlAlterTableDropPrimaryKey) sqlAlterTable,
+ resolvedCatalogTable),
+ resolvedCatalogTable.getComment(),
+ resolvedCatalogTable.getPartitionKeys(),
+ resolvedCatalogTable.getOptions()));
} else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) {
- SqlAlterTableDropConstraint dropConstraint =
- ((SqlAlterTableDropConstraint) sqlAlterTable);
- String constraintName = dropConstraint.getConstraintName().getSimple();
- TableSchema oriSchema =
- TableSchema.fromResolvedSchema(
- baseTable
- .getUnresolvedSchema()
- .resolve(catalogManager.getSchemaResolver()));
- if (!oriSchema
- .getPrimaryKey()
- .filter(pk -> pk.getName().equals(constraintName))
- .isPresent()) {
- throw new ValidationException(
- String.format("CONSTRAINT [%s] does not exist", constraintName));
- }
- return new AlterTableDropConstraintOperation(tableIdentifier, constraintName);
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ alterSchemaConverter.applySchemaChange(
+ (SqlAlterTableDropConstraint) sqlAlterTable,
+ resolvedCatalogTable),
+ resolvedCatalogTable.getComment(),
+ resolvedCatalogTable.getPartitionKeys(),
+ resolvedCatalogTable.getOptions()));
+ } else if (sqlAlterTable instanceof SqlAlterTableDropWatermark) {
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ alterSchemaConverter.applySchemaChange(
+ (SqlAlterTableDropWatermark) sqlAlterTable,
+ resolvedCatalogTable),
+ resolvedCatalogTable.getComment(),
+ resolvedCatalogTable.getPartitionKeys(),
+ resolvedCatalogTable.getOptions()));
} else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
return OperationConverterUtils.convertAddReplaceColumns(
tableIdentifier,
@@ -537,15 +561,14 @@ public class SqlToOperationConverter {
(SqlAlterTableRenameColumn) sqlAlterTable;
Schema newSchema =
alterSchemaConverter.applySchemaChange(
- sqlAlterTableRenameColumn, optionalCatalogTable.get());
- CatalogTable baseCatalogTable = (CatalogTable) baseTable;
+ sqlAlterTableRenameColumn, resolvedCatalogTable);
return new AlterTableSchemaOperation(
tableIdentifier,
CatalogTable.of(
newSchema,
- baseCatalogTable.getComment(),
- baseCatalogTable.getPartitionKeys(),
- baseCatalogTable.getOptions()));
+ resolvedCatalogTable.getComment(),
+ resolvedCatalogTable.getPartitionKeys(),
+ resolvedCatalogTable.getOptions()));
} else if (sqlAlterTable instanceof SqlAddPartitions) {
List<CatalogPartitionSpec> specs = new ArrayList<>();
List<CatalogPartition> partitions = new ArrayList<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
new file mode 100644
index 00000000000..0444afea947
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ColumnReferenceFinder}. */
+public class ColumnReferenceFinderTest extends TableTestBase {
+
+ private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
+ private ResolvedSchema resolvedSchema;
+
+ @BeforeEach
+ public void beforeEach() {
+ resolvedSchema =
+ util.testingTableEnv()
+ .getCatalogManager()
+ .getSchemaResolver()
+ .resolve(
+ Schema.newBuilder()
+ .columnByExpression("a", "b || '_001'")
+ .column("b", DataTypes.STRING())
+ .columnByExpression("c", "d * e + 2")
+ .column("d", DataTypes.DOUBLE())
+ .columnByMetadata("e", DataTypes.INT(), null, true)
+ .column(
+ "tuple",
+ DataTypes.ROW(
+ DataTypes.TIMESTAMP(3), DataTypes.INT()))
+ .columnByExpression("ts", "tuple.f0")
+ .watermark("ts", "ts - interval '5' day")
+ .build());
+ }
+
+ @Test
+ public void testFindReferencedColumn() {
+ assertThat(ColumnReferenceFinder.findReferencedColumn("b", resolvedSchema))
+ .isEqualTo(Collections.emptySet());
+
+ assertThat(ColumnReferenceFinder.findReferencedColumn("a", resolvedSchema))
+ .containsExactlyInAnyOrder("b");
+
+ assertThat(ColumnReferenceFinder.findReferencedColumn("c", resolvedSchema))
+ .containsExactlyInAnyOrder("d", "e");
+
+ assertThat(ColumnReferenceFinder.findReferencedColumn("ts", resolvedSchema))
+ .containsExactlyInAnyOrder("tuple");
+
+ assertThat(ColumnReferenceFinder.findWatermarkReferencedColumn(resolvedSchema))
+ .containsExactlyInAnyOrder("ts");
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 4dedea89f21..5ad6bce141a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -81,7 +81,6 @@ import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
@@ -1275,70 +1274,50 @@ public class SqlToOperationConverterTest {
// rename pk column c
Operation operation = parse("alter table tb1 rename c to c1");
assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
- assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+ assertThat(operation.asSummaryString())
.isEqualTo(
- Schema.newBuilder()
- .column("a", DataTypes.INT().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c1", DataTypes.STRING().notNull())
- .withComment("column comment")
- .columnByExpression("d", "a*(b+2 + a*b)")
- .column(
- "e",
- DataTypes.ROW(
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.ROW(
- DataTypes.DOUBLE(),
- DataTypes.ARRAY(DataTypes.FLOAT()))))
- .columnByExpression("f", "e.f1 + e.f2.f0")
- .columnByMetadata("g", DataTypes.STRING(), null, true)
- .column("ts", DataTypes.TIMESTAMP(3))
- .withComment("just a comment")
- .watermark("ts", "ts - interval '5' seconds")
- .primaryKeyNamed("ct1", "a", "b", "c1")
- .build());
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT NOT NULL,\n"
+ + " `c1` STRING NOT NULL COMMENT 'column comment',\n"
+ + " `d` AS [a*(b+2 + a*b)],\n"
+ + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ + " `f` AS [e.f1 + e.f2.f0],\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
+ + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
+ + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c1`) NOT ENFORCED\n"
+ + ")");
// rename computed column
operation = parse("alter table tb1 rename f to f1");
assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
- assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+ assertThat(operation.asSummaryString())
.isEqualTo(
- Schema.newBuilder()
- .column("a", DataTypes.INT().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.STRING().notNull())
- .withComment("column comment")
- .columnByExpression("d", "a*(b+2 + a*b)")
- .column(
- "e",
- DataTypes.ROW(
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.ROW(
- DataTypes.DOUBLE(),
- DataTypes.ARRAY(DataTypes.FLOAT()))))
- .columnByExpression("f1", "e.f1 + e.f2.f0")
- .columnByMetadata("g", DataTypes.STRING(), null, true)
- .column("ts", DataTypes.TIMESTAMP(3))
- .withComment("just a comment")
- .watermark("ts", "ts - interval '5' seconds")
- .primaryKeyNamed("ct1", "a", "b", "c")
- .build());
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT NOT NULL,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ + " `d` AS [a*(b+2 + a*b)],\n"
+ + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ + " `f1` AS [e.f1 + e.f2.f0],\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n"
+ + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n"
+ + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c`) NOT ENFORCED\n"
+ + ")");
// rename column c that is used in a computed column
assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), "
- + "currently doesn't allow to rename column which is referred by computed column.");
+ "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
// rename column used in the watermark expression
assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, "
- + "currently doesn't allow to rename column which is referred by watermark expression.");
+ "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
// rename nested column
assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
@@ -1348,8 +1327,7 @@ public class SqlToOperationConverterTest {
// rename column with duplicate name
assertThatThrownBy(() -> parse("alter table tb1 rename c to a"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "New column a already existed in table schema for RENAME COLUMN");
+ .hasMessageContaining("The column `a` already existed in table schema.");
// rename column e test computed column expression is ApiExpression which doesn't implement
// the equals method
@@ -1374,29 +1352,152 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't "
- + "allow to rename column which is referred by computed column.");
+ "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `j` STRING AS upper(e).");
// rename column used as partition key
assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Can not rename column a because it is used as the partition keys");
+ "Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
+ }
+
+ @Test
+ public void testFailedToAlterTableDropColumn() throws Exception {
+ prepareTable("tb1", false, false, true, 3);
+
+ // drop a nonexistent column
+ assertThatThrownBy(() -> parse("alter table tb1 drop x"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("The column `x` does not exist in the base table.");
+
+ assertThatThrownBy(() -> parse("alter table tb1 drop (g, x)"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("The column `x` does not exist in the base table.");
+
+ // duplicate column
+ assertThatThrownBy(() -> parse("alter table tb1 drop (g, c, g)"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Duplicate column `g`.");
+
+ // drop a nested column
+ assertThatThrownBy(() -> parse("alter table tb1 drop e.f2"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Alter nested row type e.f2 is not supported yet.");
+
+ // drop a column which generates a computed column
+ assertThatThrownBy(() -> parse("alter table tb1 drop a"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b).");
+
+ // drop a column which is pk
+ assertThatThrownBy(() -> parse("alter table tb1 drop c"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("The column `c` is used as the primary key.");
+
+ // drop a column which defines watermark
+ assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds].");
+ }
+
+ @Test
+ public void testAlterTableDropColumn() throws Exception {
+ prepareNonManagedTable(false);
+ // drop a single column
+ Operation operation = parse("alter table tb1 drop c");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT NOT NULL,\n"
+ + " `d` AS [a*(b+2 + a*b)],\n"
+ + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ + " `f` AS [e.f1 + e.f2.f0],\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+ + ")");
+
+ // drop computed column and referenced columns together
+ operation = parse("alter table tb1 drop (f, e, b, d)");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+ + ")");
+ }
+
+ @Test
+ public void testFailedToAlterTableDropConstraint() throws Exception {
+ prepareNonManagedTable("tb1", 0);
+ assertThatThrownBy(() -> parse("alter table tb1 drop primary key"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("The base table does not define any primary key.");
+ assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("The base table does not define any primary key.");
+ prepareNonManagedTable("tb2", 1);
+ assertThatThrownBy(() -> parse("alter table tb2 drop constraint ct2"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
}
@Test
public void testAlterTableDropConstraint() throws Exception {
prepareNonManagedTable(true);
- // Test alter table add enforced
+ String expectedSummaryString =
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT NOT NULL,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ + " `d` AS [a*(b+2 + a*b)],\n"
+ + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ + " `f` AS [e.f1 + e.f2.f0],\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+ + ")";
+
Operation operation = parse("alter table tb1 drop constraint ct1");
- assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class);
- AlterTableDropConstraintOperation dropConstraint =
- (AlterTableDropConstraintOperation) operation;
- assertThat(dropConstraint.asSummaryString())
- .isEqualTo("ALTER TABLE cat1.db1.tb1 DROP CONSTRAINT ct1");
- assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct2"))
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+
+ operation = parse("alter table tb1 drop primary key");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+ }
+
+ @Test
+ public void testFailedToAlterTableDropWatermark() throws Exception {
+ prepareNonManagedTable("tb1", false);
+ assertThatThrownBy(() -> parse("alter table tb1 drop watermark"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining("CONSTRAINT [ct2] does not exist");
+ .hasMessageContaining("The base table does not define any watermark strategy.");
+ }
+
+ @Test
+ public void testAlterTableDropWatermark() throws Exception {
+ prepareNonManagedTable("tb1", true);
+ Operation operation = parse("alter table tb1 drop watermark");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT NOT NULL,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ + " `d` AS [a*(b+2 + a*b)],\n"
+ + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ + " `f` AS [e.f1 + e.f2.f0],\n"
+ + " `g` METADATA VIRTUAL,\n"
+ + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
+ + ")");
}
@Test
@@ -1464,50 +1565,38 @@ public class SqlToOperationConverterTest {
// try to add a column with duplicated name
assertThatThrownBy(() -> parse("alter table tb1 add a bigint"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Try to add a column `a` which already exists in the table.");
+ .hasMessageContaining("Try to add a column `a` which already exists in the table.");
// try to add multiple columns with duplicated column name
assertThatThrownBy(() -> parse("alter table tb1 add (x array<string>, x string)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Encounter duplicate column `x`.");
+ .hasMessageContaining("Encounter duplicate column `x`.");
// refer to a nonexistent column
assertThatThrownBy(() -> parse("alter table tb1 add x bigint after y"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Referenced column `y` by 'AFTER' does not exist in the table.");
+ "Referenced column `y` by 'AFTER' does not exist in the table.");
// refer to a new added column that appears in the post position
assertThatThrownBy(() -> parse("alter table tb1 add (x bigint after y, y string first)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Referenced column `y` by 'AFTER' does not exist in the table.");
+ "Referenced column `y` by 'AFTER' does not exist in the table.");
// add a computed column based on nonexistent column
assertThatThrownBy(() -> parse("alter table tb1 add m as n + 2"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid expression for computed column 'm'.");
+ .hasMessageContaining("Invalid expression for computed column 'm'.");
// add a computed column based on another computed column
assertThatThrownBy(() -> parse("alter table tb1 add (m as b * 2, n as m + 2)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid expression for computed column 'n'.");
+ .hasMessageContaining("Invalid expression for computed column 'n'.");
// invalid expression
assertThatThrownBy(() -> parse("alter table tb1 add (m as 'hello' || b)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid expression for computed column 'm'.");
+ .hasMessageContaining("Invalid expression for computed column 'm'.");
// add an inner field to a nested row
assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)"))
@@ -1517,9 +1606,7 @@ public class SqlToOperationConverterTest {
// refer to a nested inner field
assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type is not supported yet.");
assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
@@ -1647,8 +1734,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 add primary key(c) not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table has already defined the primary key constraint [`a`]. "
+ "The base table has already defined the primary key constraint [`a`]. "
+ "You might want to drop it before adding a new one.");
assertThatThrownBy(
@@ -1657,8 +1743,7 @@ public class SqlToOperationConverterTest {
"alter table tb1 add x string not null primary key not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table has already defined the primary key constraint [`a`]. "
+ "The base table has already defined the primary key constraint [`a`]. "
+ "You might want to drop it before adding a new one");
// the original table has composite pk
@@ -1667,8 +1752,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 add primary key(c) not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table has already defined the primary key constraint [`a`, `b`]. "
+ "The base table has already defined the primary key constraint [`a`, `b`]. "
+ "You might want to drop it before adding a new one");
assertThatThrownBy(
@@ -1677,8 +1761,7 @@ public class SqlToOperationConverterTest {
"alter table tb2 add x string not null primary key not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table has already defined the primary key constraint [`a`, `b`]. "
+ "The base table has already defined the primary key constraint [`a`, `b`]. "
+ "You might want to drop it before adding a new one");
// the original table does not define pk
@@ -1687,8 +1770,7 @@ public class SqlToOperationConverterTest {
// specify a nonexistent column as pk
assertThatThrownBy(() -> parse("alter table tb3 add primary key (x) not enforced"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid primary key 'PK_x'. Column 'x' does not exist.");
+ .hasMessageContaining("Invalid primary key 'PK_x'. Column 'x' does not exist.");
// add unique constraint
assertThatThrownBy(() -> parse("alter table tb3 add unique(b)"))
@@ -1710,15 +1792,13 @@ public class SqlToOperationConverterTest {
+ " primary key (d, x) not enforced)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
+ "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column.");
// add a pk which is metadata column
assertThatThrownBy(() -> parse("alter table tb3 add (primary key (g) not enforced)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
+ "Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
}
@Test
@@ -1799,16 +1879,14 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 add watermark for x as x"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid column name 'x' for rowtime attribute in watermark declaration. "
+ "Invalid column name 'x' for rowtime attribute in watermark declaration. "
+ "Available columns are: [a, b, c, d, e, f, g, ts]");
// add watermark with invalid type
assertThatThrownBy(() -> parse("alter table tb1 add watermark for b as b"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid data type of time field for watermark definition. "
+ "Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), "
+ "the supported precision 'p' is from 0 to 3, but the time field type is BIGINT NOT NULL");
@@ -1818,9 +1896,7 @@ public class SqlToOperationConverterTest {
parse(
"alter table tb1 add (x row<f0 string, f1 timestamp(3)>, watermark for x.f1 as x.f1)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Watermark strategy on nested column is not supported yet.");
+ .hasMessageContaining("Watermark strategy on nested column is not supported yet.");
// add watermark to the table which already has watermark defined
prepareNonManagedTable("tb2", true);
@@ -1828,8 +1904,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 add watermark for ts as ts"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table has already defined the watermark strategy "
+ "The base table has already defined the watermark strategy "
+ "`ts` AS ts - interval '5' seconds. "
+ "You might want to drop it before adding a new one.");
}
@@ -1924,47 +1999,42 @@ public class SqlToOperationConverterTest {
// modify duplicated column same
assertThatThrownBy(() -> parse("alter table tb1 modify (b int, b array<int not null>)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nEncounter duplicate column `b`.");
+ .hasMessageContaining("Encounter duplicate column `b`.");
// modify nonexistent column name
assertThatThrownBy(() -> parse("alter table tb1 modify x bigint"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nTry to modify a column `x` which does not exist in the table.");
+ "Try to modify a column `x` which does not exist in the table.");
// refer to nonexistent column name
assertThatThrownBy(() -> parse("alter table tb1 modify a bigint after x"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nReferenced column `x` by 'AFTER' does not exist in the table.");
+ "Referenced column `x` by 'AFTER' does not exist in the table.");
// modify physical columns which generates computed column
assertThatThrownBy(() -> parse("alter table tb1 modify e array<int>"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'f'.");
+ .hasMessageContaining("Invalid expression for computed column 'f'.");
assertThatThrownBy(() -> parse("alter table tb1 modify a string"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+ .hasMessageContaining("Invalid expression for computed column 'd'.");
assertThatThrownBy(() -> parse("alter table tb1 modify b as a + 2"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+ .hasMessageContaining("Invalid expression for computed column 'd'.");
assertThatThrownBy(() -> parse("alter table tb1 modify (a timestamp(3), b multiset<int>)"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'.");
+ .hasMessageContaining("Invalid expression for computed column 'd'.");
// modify the rowtime field which defines watermark
assertThatThrownBy(() -> parse("alter table tb1 modify ts int"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+ "Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), "
+ "the supported precision 'p' is from 0 to 3, but the time field type is INT");
@@ -1974,12 +2044,12 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 modify (d int, a as b + 2)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column.");
+ "Invalid primary key 'ct1'. Column 'a' is not a physical column.");
assertThatThrownBy(() -> parse("alter table tb2 modify (d string, a int metadata virtual)"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column.");
+ "Invalid primary key 'ct1'. Column 'a' is not a physical column.");
// modify an inner field to a nested row
assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)"))
@@ -1989,9 +2059,7 @@ public class SqlToOperationConverterTest {
// refer to a nested inner field
assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type is not supported yet.");
assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
@@ -2132,8 +2200,7 @@ public class SqlToOperationConverterTest {
"alter table tb1 modify constraint ct primary key (b) not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "The base table does not define any primary key constraint. You might want to add a new one.");
+ "The base table does not define any primary key constraint. You might want to add a new one.");
prepareNonManagedTable("tb2", 1);
@@ -2143,9 +2210,7 @@ public class SqlToOperationConverterTest {
parse(
"alter table tb2 modify constraint ct primary key (x) not enforced"))
.isInstanceOf(ValidationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid primary key 'ct'. Column 'x' does not exist.");
+ .hasMessageContaining("Invalid primary key 'ct'. Column 'x' does not exist.");
// specify computed column as pk
assertThatThrownBy(
@@ -2154,8 +2219,7 @@ public class SqlToOperationConverterTest {
"alter table tb2 modify constraint ct primary key (d) not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid primary key 'ct'. Column 'd' is not a physical column.");
+ "Invalid primary key 'ct'. Column 'd' is not a physical column.");
// specify metadata column as pk
assertThatThrownBy(
@@ -2164,8 +2228,7 @@ public class SqlToOperationConverterTest {
"alter table tb2 modify constraint ct primary key (g) not enforced"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Invalid primary key 'ct'. Column 'g' is not a physical column.");
+ "Invalid primary key 'ct'. Column 'g' is not a physical column.");
}
@Test
@@ -2226,7 +2289,7 @@ public class SqlToOperationConverterTest {
"alter table tb1 modify watermark for a as to_timestamp(a) - interval '1' minute"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nThe base table does not define any watermark. You might want to add a new one.");
+ "The base table does not define any watermark. You might want to add a new one.");
prepareNonManagedTable("tb2", true);
@@ -2234,7 +2297,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 modify watermark for a as a"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+ "Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
+ "but the time field type is INT NOT NULL");
@@ -2244,7 +2307,7 @@ public class SqlToOperationConverterTest {
"alter table tb2 modify watermark for c as to_timestamp(c) - interval '1' day"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. "
+ "Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
+ "but the time field type is STRING");
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a78bb5a07cc..0b2c47ce5ec 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -818,6 +818,106 @@ class TableEnvironmentTest {
tableResult.collect())
}
+ @Test
+ def testAlterTableDropColumn(): Unit = {
+ val statement =
+ """
+ |CREATE TABLE MyTable (
+ | a BIGINT,
+ | b INT,
+ | d TIMESTAMP(3),
+ | e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+ | WATERMARK FOR d AS d - INTERVAL '1' MINUTE
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(statement)
+
+ tableEnv.executeSql("ALTER TABLE MyTable DROP (e, a)")
+
+ val expectedResult = util.Arrays.asList(
+ Row.of("b", "INT", Boolean.box(true), null, null, null),
+ Row.of(
+ "d",
+ "TIMESTAMP(3) *ROWTIME*",
+ Boolean.box(true),
+ null,
+ null,
+ "`d` - INTERVAL '1' MINUTE")
+ )
+ val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+ checkData(expectedResult.iterator(), tableResult.collect())
+ }
+
+ @Test
+ def testAlterTableDropConstraint(): Unit = {
+ val statement =
+ """
+ |CREATE TABLE MyTable (
+ | a BIGINT,
+ | b INT,
+ | d TIMESTAMP(3),
+ | e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+ | CONSTRAINT ct PRIMARY KEY(a, b) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(statement)
+
+ tableEnv.executeSql("ALTER TABLE MyTable DROP CONSTRAINT ct")
+
+ val expectedResult = util.Arrays.asList(
+ Row.of("a", "BIGINT", Boolean.box(false), null, null, null),
+ Row.of("b", "INT", Boolean.box(false), null, null, null),
+ Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+ Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null)
+ )
+ val tableResult1 = tableEnv.executeSql("DESCRIBE MyTable")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind)
+ checkData(expectedResult.iterator(), tableResult1.collect())
+
+ tableEnv.executeSql("ALTER TABLE MyTable ADD CONSTRAINT ct PRIMARY KEY(a) NOT ENFORCED")
+ tableEnv.executeSql("ALTER TABLE MyTable DROP PRIMARY KEY")
+ val tableResult2 = tableEnv.executeSql("DESCRIBE MyTable")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ checkData(expectedResult.iterator(), tableResult2.collect())
+ }
+
+ @Test
+ def testAlterTableDropWatermark(): Unit = {
+ val statement =
+ """
+ |CREATE TABLE MyTable (
+ | a BIGINT,
+ | b INT,
+ | d TIMESTAMP(3),
+ | e ROW<e0 STRING, e1 TIMESTAMP(3)>,
+ | WATERMARK FOR d AS d - INTERVAL '1' MINUTE
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ """.stripMargin
+ tableEnv.executeSql(statement)
+
+ tableEnv.executeSql("ALTER TABLE MyTable DROP WATERMARK")
+
+ val expectedResult = util.Arrays.asList(
+ Row.of("a", "BIGINT", Boolean.box(true), null, null, null),
+ Row.of("b", "INT", Boolean.box(true), null, null, null),
+ Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null),
+ Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null)
+ )
+ val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+ checkData(expectedResult.iterator(), tableResult.collect())
+ }
+
@Test
def testAlterTableCompactOnNonManagedTable(): Unit = {
val statement =