You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/12/29 02:05:38 UTC
[flink] 01/01: [FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 94fba321d4e29f996c8234c9dcb3b7a93ad8a593
Author: fengli <ld...@163.com>
AuthorDate: Fri Apr 1 20:50:56 2022 +0800
[FLINK-22318][table] Support RENAME column name for ALTER TABLE statement, optimize the code implementation
This closes #19329
---
.../src/test/resources/sql/table.q | 36 +-
.../src/main/codegen/includes/parserImpls.ftl | 11 +-
.../sql/parser/ddl/SqlAlterTableRenameColumn.java | 53 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 32 +-
.../operations/SqlToOperationConverter.java | 0
.../operations/SqlToOperationConverterTest.java | 1548 --------------------
.../planner/expressions/ColumnReferenceFinder.java | 108 ++
.../planner/operations/AlterSchemaConverter.java | 177 ++-
.../operations/SqlToOperationConverter.java | 15 +
.../planner/utils/OperationConverterUtils.java | 125 +-
.../table/planner/plan/utils/FlinkRexUtil.scala | 2 +-
.../operations/SqlToOperationConverterTest.java | 180 ++-
.../flink/table/api/TableEnvironmentTest.scala | 24 +-
13 files changed, 554 insertions(+), 1757 deletions(-)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 61d944f7b29..36809085117 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -395,6 +395,30 @@ alter table orders2 reset ();
org.apache.flink.table.api.ValidationException: ALTER TABLE RESET does not support empty key
!error
+# ==========================================================================
+# test alter table rename column
+# ==========================================================================
+
+alter table orders2 rename amount to amount1;
+[INFO] Execute statement succeed.
+!info
+
+# verify table options using SHOW CREATE TABLE
+show create table orders2;
+CREATE TABLE `default_catalog`.`default_database`.`orders2` (
+ `user` BIGINT NOT NULL,
+ `product` VARCHAR(32),
+ `amount1` INT,
+ `ts` TIMESTAMP(3),
+ `ptime` AS PROCTIME(),
+ WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
+ CONSTRAINT `PK_3599338` PRIMARY KEY (`user`) NOT ENFORCED
+) WITH (
+ 'connector' = 'datagen'
+)
+
+!ok
+
# ==========================================================================
# test alter table add schema
# ==========================================================================
@@ -424,7 +448,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
`user` BIGINT NOT NULL,
`product_id` BIGINT NOT NULL,
`product` VARCHAR(32),
- `amount` INT,
+ `amount1` INT,
`ts` TIMESTAMP(3),
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -449,7 +473,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
`product_id` BIGINT NOT NULL,
`product` VARCHAR(32),
`cleaned_product` AS COALESCE(`product`, 'missing_sku'),
- `amount` INT,
+ `amount1` INT,
`ts` TIMESTAMP(3),
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -477,7 +501,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
`product_id` BIGINT NOT NULL,
`product` VARCHAR(32),
`cleaned_product` AS COALESCE(`product`, 'missing_sku'),
- `amount` INT,
+ `amount1` INT,
`ts` TIMESTAMP(3),
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' SECOND,
@@ -503,7 +527,7 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` (
`product_id` BIGINT NOT NULL,
`product` VARCHAR(32),
`cleaned_product` AS COALESCE(`product`, 'missing_sku'),
- `amount` INT,
+ `amount1` INT,
`ptime` AS PROCTIME(),
WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE,
CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED
@@ -530,7 +554,7 @@ describe orders2;
| product_id | BIGINT | FALSE | | | |
| product | VARCHAR(32) | TRUE | | | |
| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | |
-| amount | INT | TRUE | | | |
+| amount1 | INT | TRUE | | | |
| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
9 rows in set
@@ -548,7 +572,7 @@ desc orders2;
| product_id | BIGINT | FALSE | | | |
| product | VARCHAR(32) | TRUE | | | |
| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | |
-| amount | INT | TRUE | | | |
+| amount1 | INT | TRUE | | | |
| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | |
+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+
9 rows in set
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index d03b987b9b1..57424abc495 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -595,8 +595,8 @@ SqlAlterTable SqlAlterTable() :
SqlNodeList partitionSpec = null;
SqlIdentifier constraintName;
SqlTableConstraint constraint;
- SqlIdentifier originColumnName;
- SqlIdentifier newColumnName;
+ SqlIdentifier originColumnIdentifier;
+ SqlIdentifier newColumnIdentifier;
AlterTableContext ctx = new AlterTableContext();
}
{
@@ -614,14 +614,15 @@ SqlAlterTable SqlAlterTable() :
}
|
<RENAME>
- originColumnName = SimpleIdentifier()
+ originColumnIdentifier = CompoundIdentifier()
<TO>
- newColumnName = SimpleIdentifier()
+ newColumnIdentifier = CompoundIdentifier()
{
return new SqlAlterTableRenameColumn(
startPos.plus(getPos()),
tableIdentifier,
- originColumnName,newColumnName);
+ originColumnIdentifier,
+ newColumnIdentifier);
}
|
<RESET>
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index c75e2ae1280..28196183ff7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.sql.parser.ddl;
import org.apache.calcite.sql.SqlIdentifier;
@@ -9,34 +27,35 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName
- * RENAME originColumnName TO newColumnName
+ * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME originColumnName TO newColumnName.
*/
public class SqlAlterTableRenameColumn extends SqlAlterTable {
- private final SqlIdentifier originColumnNameIdentifier;
- private final SqlIdentifier newColumnNameIdentifier;
+ private final SqlIdentifier originColumnIdentifier;
+ private final SqlIdentifier newColumnIdentifier;
- public SqlAlterTableRenameColumn(SqlParserPos pos,
- SqlIdentifier tableName,
- SqlIdentifier originColumnName,
- SqlIdentifier newColumnName) {
+ public SqlAlterTableRenameColumn(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlIdentifier originColumnIdentifier,
+ SqlIdentifier newColumnIdentifier) {
super(pos, tableName, null);
- this.originColumnNameIdentifier = originColumnName;
- this.newColumnNameIdentifier = newColumnName;
+ this.originColumnIdentifier = originColumnIdentifier;
+ this.newColumnIdentifier = newColumnIdentifier;
}
@Override
public List<SqlNode> getOperandList() {
- return ImmutableNullableList.of(tableIdentifier, originColumnNameIdentifier, newColumnNameIdentifier);
+ return ImmutableNullableList.of(
+ tableIdentifier, originColumnIdentifier, newColumnIdentifier);
}
- public SqlIdentifier getOriginColumnNameIdentifier() {
- return originColumnNameIdentifier;
+ public SqlIdentifier getOriginColumnIdentifier() {
+ return originColumnIdentifier;
}
- public SqlIdentifier getNewColumnNameIdentifier() {
- return newColumnNameIdentifier;
+ public SqlIdentifier getNewColumnIdentifier() {
+ return newColumnIdentifier;
}
@Override
@@ -44,8 +63,8 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
writer.keyword("ALTER TABLE");
tableIdentifier.unparse(writer, leftPrec, rightPrec);
writer.keyword("RENAME");
- originColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+ originColumnIdentifier.unparse(writer, leftPrec, rightPrec);
writer.keyword("TO");
- newColumnNameIdentifier.unparse(writer, leftPrec, rightPrec);
+ newColumnIdentifier.unparse(writer, leftPrec, rightPrec);
}
}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 065743bcbad..fea81a9fe2d 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -315,26 +315,18 @@ class FlinkSqlParserImplTest extends SqlParserTest {
void testAlterTable() {
sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
- final String sql0 = "alter table t1 set ('key1'='value1')";
- final String expected0 = "ALTER TABLE `T1` SET (\n" + " 'key1' = 'value1'\n" + ")";
- sql(sql0).ok(expected0);
- final String sql1 = "alter table t1 " + "add constraint ct1 primary key(a, b) not enforced";
- final String expected1 =
- "ALTER TABLE `T1` ADD (\n"
- + " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
- + ")";
- sql(sql1).ok(expected1);
- final String sql2 = "alter table t1 " + "add unique(a, b)";
- final String expected2 = "ALTER TABLE `T1` ADD (\n" + " UNIQUE (`A`, `B`)\n" + ")";
- sql(sql2).ok(expected2);
- final String sql3 = "alter table t1 drop constraint ct1";
- final String expected3 = "ALTER TABLE `T1` DROP CONSTRAINT `CT1`";
- sql(sql3).ok(expected3);
-
- final String sql4 = "alter table t1 rename a to b";
- final String expected4 = "ALTER TABLE `T1` RENAME `A` TO `B`";
- sql(sql4).ok(expected4);
-
+ sql("alter table t1 set ('key1'='value1')")
+ .ok("ALTER TABLE `T1` SET (\n" + " 'key1' = 'value1'\n" + ")");
+ sql("alter table t1 add constraint ct1 primary key(a, b) not enforced")
+ .ok(
+ "ALTER TABLE `T1` ADD (\n"
+ + " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+ + ")");
+ sql("alter table t1 " + "add unique(a, b)")
+ .ok("ALTER TABLE `T1` ADD (\n" + " UNIQUE (`A`, `B`)\n" + ")");
+ sql("alter table t1 drop constraint ct1").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT1`");
+ sql("alter table t1 rename a to b").ok("ALTER TABLE `T1` RENAME `A` TO `B`");
+ sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` TO `A`.`Y`");
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
deleted file mode 100644
index e69de29bb2d..00000000000
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
deleted file mode 100644
index 1ccd855972b..00000000000
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ /dev/null
@@ -1,1548 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.SqlDialect;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.ComputedColumn;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
-import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogFunctionImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.expressions.SqlCallExpression;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.operations.BeginStatementSetOperation;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.EndStatementSetOperation;
-import org.apache.flink.table.operations.LoadModuleOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
-import org.apache.flink.table.operations.ShowModulesOperation;
-import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.UseModulesOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableAddConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation;
-import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
-import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
-import org.apache.flink.table.planner.delegation.ParserImpl;
-import org.apache.flink.table.planner.delegation.PlannerContext;
-import org.apache.flink.table.planner.expressions.utils.Func0$;
-import org.apache.flink.table.planner.expressions.utils.Func1$;
-import org.apache.flink.table.planner.expressions.utils.Func8$;
-import org.apache.flink.table.planner.parse.CalciteParser;
-import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.flink.table.utils.ExpressionResolverMocks;
-
-import org.apache.calcite.sql.SqlNode;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
-import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
-import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
-import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withOptions;
-import static org.apache.flink.table.planner.utils.OperationMatchers.withSchema;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/** Test cases for {@link SqlToOperationConverter}. */
-public class SqlToOperationConverterTest {
- private final boolean isStreamingMode = false;
- private final TableConfig tableConfig = new TableConfig();
- private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
- private final CatalogManager catalogManager =
- CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build();
- private final ModuleManager moduleManager = new ModuleManager();
- private final FunctionCatalog functionCatalog =
- new FunctionCatalog(tableConfig, catalogManager, moduleManager);
- private final Supplier<FlinkPlannerImpl> plannerSupplier =
- () ->
- getPlannerContext()
- .createFlinkPlanner(
- catalogManager.getCurrentCatalog(),
- catalogManager.getCurrentDatabase());
- private final Parser parser =
- new ParserImpl(
- catalogManager,
- plannerSupplier,
- () -> plannerSupplier.get().parser(),
- t ->
- getPlannerContext()
- .createSqlExprToRexConverter(
- getPlannerContext()
- .getTypeFactory()
- .buildRelNodeRowType(t)));
- private final PlannerContext plannerContext =
- new PlannerContext(
- tableConfig,
- functionCatalog,
- catalogManager,
- asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)),
- Collections.emptyList());
-
- private PlannerContext getPlannerContext() {
- return plannerContext;
- }
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void before() throws TableAlreadyExistException, DatabaseNotExistException {
- catalogManager.initSchemaResolver(
- isStreamingMode,
- ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser));
-
- final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
- final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
- final TableSchema tableSchema =
- TableSchema.builder()
- .field("a", DataTypes.BIGINT())
- .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
- .field("c", DataTypes.INT())
- .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
- .build();
- Map<String, String> options = new HashMap<>();
- options.put("connector", "COLLECTION");
- final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, "");
- catalog.createTable(path1, catalogTable, true);
- catalog.createTable(path2, catalogTable, true);
- }
-
- @After
- public void after() throws TableNotExistException {
- final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
- final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
- catalog.dropTable(path1, true);
- catalog.dropTable(path2, true);
- }
-
- @Test
- public void testUseCatalog() {
- final String sql = "USE CATALOG cat1";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof UseCatalogOperation;
- assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName());
- }
-
- @Test
- public void testUseDatabase() {
- final String sql1 = "USE db1";
- Operation operation1 = parse(sql1, SqlDialect.DEFAULT);
- assert operation1 instanceof UseDatabaseOperation;
- assertEquals("builtin", ((UseDatabaseOperation) operation1).getCatalogName());
- assertEquals("db1", ((UseDatabaseOperation) operation1).getDatabaseName());
-
- final String sql2 = "USE cat1.db1";
- Operation operation2 = parse(sql2, SqlDialect.DEFAULT);
- assert operation2 instanceof UseDatabaseOperation;
- assertEquals("cat1", ((UseDatabaseOperation) operation2).getCatalogName());
- assertEquals("db1", ((UseDatabaseOperation) operation2).getDatabaseName());
- }
-
- @Test(expected = ValidationException.class)
- public void testUseDatabaseWithException() {
- final String sql = "USE cat1.db1.tbl1";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- }
-
- @Test
- public void testCreateDatabase() {
- final String[] createDatabaseSqls =
- new String[] {
- "create database db1",
- "create database if not exists cat1.db1",
- "create database cat1.db1 comment 'db1_comment'",
- "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'K2' = 'V2')"
- };
- final String[] expectedCatalogs = new String[] {"builtin", "cat1", "cat1", "cat1"};
- final String expectedDatabase = "db1";
- final String[] expectedComments = new String[] {null, null, "db1_comment", "db1_comment"};
- final boolean[] expectedIgnoreIfExists = new boolean[] {false, true, false, false};
- Map<String, String> properties = new HashMap<>();
- properties.put("k1", "v1");
- properties.put("K2", "V2");
- final Map[] expectedProperties =
- new Map[] {
- new HashMap<String, String>(),
- new HashMap<String, String>(),
- new HashMap<String, String>(),
- new HashMap(properties)
- };
-
- for (int i = 0; i < createDatabaseSqls.length; i++) {
- Operation operation = parse(createDatabaseSqls[i], SqlDialect.DEFAULT);
- assert operation instanceof CreateDatabaseOperation;
- final CreateDatabaseOperation createDatabaseOperation =
- (CreateDatabaseOperation) operation;
- assertEquals(expectedCatalogs[i], createDatabaseOperation.getCatalogName());
- assertEquals(expectedDatabase, createDatabaseOperation.getDatabaseName());
- assertEquals(
- expectedComments[i], createDatabaseOperation.getCatalogDatabase().getComment());
- assertEquals(expectedIgnoreIfExists[i], createDatabaseOperation.isIgnoreIfExists());
- assertEquals(
- expectedProperties[i],
- createDatabaseOperation.getCatalogDatabase().getProperties());
- }
- }
-
- @Test
- public void testDropDatabase() {
- final String[] dropDatabaseSqls =
- new String[] {
- "drop database db1",
- "drop database if exists db1",
- "drop database if exists cat1.db1 CASCADE",
- "drop database if exists cat1.db1 RESTRICT"
- };
- final String[] expectedCatalogs = new String[] {"builtin", "builtin", "cat1", "cat1"};
- final String expectedDatabase = "db1";
- final boolean[] expectedIfExists = new boolean[] {false, true, true, true};
- final boolean[] expectedIsCascades = new boolean[] {false, false, true, false};
-
- for (int i = 0; i < dropDatabaseSqls.length; i++) {
- Operation operation = parse(dropDatabaseSqls[i], SqlDialect.DEFAULT);
- assert operation instanceof DropDatabaseOperation;
- final DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
- assertEquals(expectedCatalogs[i], dropDatabaseOperation.getCatalogName());
- assertEquals(expectedDatabase, dropDatabaseOperation.getDatabaseName());
- assertEquals(expectedIfExists[i], dropDatabaseOperation.isIfExists());
- assertEquals(expectedIsCascades[i], dropDatabaseOperation.isCascade());
- }
- }
-
- @Test
- public void testAlterDatabase() throws Exception {
- catalogManager.registerCatalog("cat1", new GenericInMemoryCatalog("default", "default"));
- catalogManager
- .getCatalog("cat1")
- .get()
- .createDatabase(
- "db1", new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), true);
- final String sql = "alter database cat1.db1 set ('k1'='v1', 'K2'='V2')";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof AlterDatabaseOperation;
- Map<String, String> properties = new HashMap<>();
- properties.put("k1", "v1");
- properties.put("K2", "V2");
- assertEquals("db1", ((AlterDatabaseOperation) operation).getDatabaseName());
- assertEquals("cat1", ((AlterDatabaseOperation) operation).getCatalogName());
- assertEquals(
- "db1_comment",
- ((AlterDatabaseOperation) operation).getCatalogDatabase().getComment());
- assertEquals(
- properties,
- ((AlterDatabaseOperation) operation).getCatalogDatabase().getProperties());
- }
-
- @Test
- public void testLoadModule() {
- final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
- final String expectedModuleName = "dummy";
- final Map<String, String> expectedProperties = new HashMap<>();
- expectedProperties.put("k1", "v1");
- expectedProperties.put("k2", "v2");
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof LoadModuleOperation;
- final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
-
- assertEquals(expectedModuleName, loadModuleOperation.getModuleName());
- assertEquals(expectedProperties, loadModuleOperation.getProperties());
- }
-
- @Test
- public void testUnloadModule() {
- final String sql = "UNLOAD MODULE dummy";
- final String expectedModuleName = "dummy";
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof UnloadModuleOperation;
- final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
-
- assertEquals(expectedModuleName, unloadModuleOperation.getModuleName());
- }
-
- @Test
- public void testUseOneModule() {
- final String sql = "USE MODULES dummy";
- final List<String> expectedModuleNames = Collections.singletonList("dummy");
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof UseModulesOperation;
- final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
- assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
- assertEquals("USE MODULES: [dummy]", useModulesOperation.asSummaryString());
- }
-
- @Test
- public void testUseMultipleModules() {
- final String sql = "USE MODULES x, y, z";
- final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof UseModulesOperation;
- final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
- assertEquals(expectedModuleNames, useModulesOperation.getModuleNames());
- assertEquals("USE MODULES: [x, y, z]", useModulesOperation.asSummaryString());
- }
-
- @Test
- public void testShowModules() {
- final String sql = "SHOW MODULES";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof ShowModulesOperation;
- final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
- assertFalse(showModulesOperation.requireFull());
- assertEquals("SHOW MODULES", showModulesOperation.asSummaryString());
- }
-
- @Test
- public void testShowFullModules() {
- final String sql = "SHOW FULL MODULES";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof ShowModulesOperation;
- final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
- assertTrue(showModulesOperation.requireFull());
- assertEquals("SHOW FULL MODULES", showModulesOperation.asSummaryString());
- }
-
- @Test
- public void testShowFunctions() {
- final String sql1 = "SHOW FUNCTIONS";
- assertShowFunctions(sql1, sql1, FunctionScope.ALL);
-
- final String sql2 = "SHOW USER FUNCTIONS";
- assertShowFunctions(sql2, sql2, FunctionScope.USER);
- }
-
- @Test
- public void testCreateTable() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a bigint,\n"
- + " b varchar, \n"
- + " c int, \n"
- + " d varchar"
- + ")\n"
- + " PARTITIONED BY (a, d)\n"
- + " with (\n"
- + " 'connector' = 'kafka', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- Operation operation = parse(sql, planner, parser);
- assert operation instanceof CreateTableOperation;
- CreateTableOperation op = (CreateTableOperation) operation;
- CatalogTable catalogTable = op.getCatalogTable();
- assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
- assertArrayEquals(
- catalogTable.getSchema().getFieldNames(), new String[] {"a", "b", "c", "d"});
- assertArrayEquals(
- catalogTable.getSchema().getFieldDataTypes(),
- new DataType[] {
- DataTypes.BIGINT(),
- DataTypes.VARCHAR(Integer.MAX_VALUE),
- DataTypes.INT(),
- DataTypes.VARCHAR(Integer.MAX_VALUE)
- });
- }
-
- @Test
- public void testCreateTableWithPrimaryKey() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a bigint,\n"
- + " b varchar, \n"
- + " c int, \n"
- + " d varchar, \n"
- + " constraint ct1 primary key(a, b) not enforced\n"
- + ") with (\n"
- + " 'connector' = 'kafka', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- Operation operation = parse(sql, planner, parser);
- assert operation instanceof CreateTableOperation;
- CreateTableOperation op = (CreateTableOperation) operation;
- CatalogTable catalogTable = op.getCatalogTable();
- TableSchema tableSchema = catalogTable.getSchema();
- assertThat(
- tableSchema
- .getPrimaryKey()
- .map(UniqueConstraint::asSummaryString)
- .orElse("fakeVal"),
- is("CONSTRAINT ct1 PRIMARY KEY (a, b)"));
- assertArrayEquals(new String[] {"a", "b", "c", "d"}, tableSchema.getFieldNames());
- assertArrayEquals(
- new DataType[] {
- DataTypes.BIGINT().notNull(),
- DataTypes.STRING().notNull(),
- DataTypes.INT(),
- DataTypes.STRING()
- },
- tableSchema.getFieldDataTypes());
- }
-
- @Test
- public void testCreateTableWithPrimaryKeyEnforced() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a bigint,\n"
- + " b varchar, \n"
- + " c int, \n"
- + " d varchar, \n"
- +
- // Default is enforced.
- " constraint ct1 primary key(a, b)\n"
- + ") with (\n"
- + " 'connector' = 'kafka', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Flink doesn't support ENFORCED mode for PRIMARY KEY "
- + "constaint. ENFORCED/NOT ENFORCED controls if the constraint "
- + "checks are performed on the incoming/outgoing data. "
- + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode");
- parse(sql, planner, parser);
- }
-
- @Test
- public void testCreateTableWithUniqueKey() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a bigint,\n"
- + " b varchar, \n"
- + " c int, \n"
- + " d varchar, \n"
- + " constraint ct1 unique (a, b) not enforced\n"
- + ") with (\n"
- + " 'connector' = 'kafka', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("UNIQUE constraint is not supported yet");
- parse(sql, planner, parser);
- }
-
- @Test
- public void testPrimaryKeyOnGeneratedColumn() {
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Could not create a PRIMARY KEY with column 'c' at line 5, column 34.\n"
- + "A PRIMARY KEY constraint must be declared on physical columns.");
- final String sql2 =
- "CREATE TABLE tbl1 (\n"
- + " a bigint not null,\n"
- + " b varchar not null,\n"
- + " c as 2 * (a + 1),\n"
- + " constraint ct1 primary key (b, c) not enforced"
- + ") with (\n"
- + " 'connector' = 'kafka',\n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- parseAndConvert(sql2);
- }
-
- @Test
- public void testPrimaryKeyNonExistentColumn() {
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Primary key column 'd' is not defined in the schema at line 5, column 34");
- final String sql2 =
- "CREATE TABLE tbl1 (\n"
- + " a bigint not null,\n"
- + " b varchar not null,\n"
- + " c as 2 * (a + 1),\n"
- + " constraint ct1 primary key (b, d) not enforced"
- + ") with (\n"
- + " 'connector' = 'kafka',\n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- parseAndConvert(sql2);
- }
-
- @Test
- public void testCreateTableWithMinusInOptionKey() {
- final String sql =
- "create table source_table(\n"
- + " a int,\n"
- + " b bigint,\n"
- + " c varchar\n"
- + ") with (\n"
- + " 'a-B-c-d124' = 'Ab',\n"
- + " 'a.b-c-d.e-f.g' = 'ada',\n"
- + " 'a.b-c-d.e-f1231.g' = 'ada',\n"
- + " 'a.b-c-d.*' = 'adad')\n";
- final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- SqlNode node = parser.parse(sql);
- assert node instanceof SqlCreateTable;
- Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
- assert operation instanceof CreateTableOperation;
- CreateTableOperation op = (CreateTableOperation) operation;
- CatalogTable catalogTable = op.getCatalogTable();
- Map<String, String> options =
- catalogTable.getOptions().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- Map<String, String> sortedProperties = new TreeMap<>(options);
- final String expected =
- "{a-B-c-d124=Ab, "
- + "a.b-c-d.*=adad, "
- + "a.b-c-d.e-f.g=ada, "
- + "a.b-c-d.e-f1231.g=ada}";
- assertEquals(expected, sortedProperties.toString());
- }
-
- @Test
- public void testCreateTableWithWatermark()
- throws FunctionAlreadyExistException, DatabaseNotExistException {
- CatalogFunction cf =
- new CatalogFunctionImpl(JavaUserDefinedScalarFunctions.JavaFunc5.class.getName());
- catalog.createFunction(ObjectPath.fromString("default.myfunc"), cf, true);
-
- final String sql =
- "create table source_table(\n"
- + " a int,\n"
- + " b bigint,\n"
- + " c timestamp(3),\n"
- + " watermark for `c` as myfunc(c, 1) - interval '5' second\n"
- + ") with (\n"
- + " 'connector.type' = 'kafka')\n";
- final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- SqlNode node = parser.parse(sql);
- assert node instanceof SqlCreateTable;
- Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
- assert operation instanceof CreateTableOperation;
- CreateTableOperation op = (CreateTableOperation) operation;
- CatalogTable catalogTable = op.getCatalogTable();
- Map<String, String> properties = catalogTable.toProperties();
- Map<String, String> expected = new HashMap<>();
- expected.put("schema.0.name", "a");
- expected.put("schema.0.data-type", "INT");
- expected.put("schema.1.name", "b");
- expected.put("schema.1.data-type", "BIGINT");
- expected.put("schema.2.name", "c");
- expected.put("schema.2.data-type", "TIMESTAMP(3)");
- expected.put("schema.watermark.0.rowtime", "c");
- expected.put(
- "schema.watermark.0.strategy.expr",
- "`builtin`.`default`.`myfunc`(`c`, 1) - INTERVAL '5' SECOND");
- expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
- expected.put("connector.type", "kafka");
- assertEquals(expected, properties);
- }
-
- @Test
- public void testBasicCreateTableLike() {
- Map<String, String> sourceProperties = new HashMap<>();
- sourceProperties.put("format.type", "json");
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .build(),
- null,
- Collections.emptyList(),
- sourceProperties);
-
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
- final String sql =
- "create table derivedTable(\n"
- + " a int,\n"
- + " watermark for f1 as `f1` - interval '5' second\n"
- + ")\n"
- + "PARTITIONED BY (a, f0)\n"
- + "with (\n"
- + " 'connector.type' = 'kafka'"
- + ")\n"
- + "like sourceTable";
- Operation operation = parseAndConvert(sql);
-
- assertThat(
- operation,
- isCreateTableOperation(
- withSchema(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .column("a", DataTypes.INT())
- .watermark("f1", "`f1` - INTERVAL '5' SECOND")
- .build()),
- withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
- partitionedBy("a", "f0")));
- }
-
- @Test
- public void testCreateTableLikeWithFullPath() {
- Map<String, String> sourceProperties = new HashMap<>();
- sourceProperties.put("connector.type", "kafka");
- sourceProperties.put("format.type", "json");
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .build(),
- null,
- Collections.emptyList(),
- sourceProperties);
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
- final String sql = "create table mytable like `builtin`.`default`.sourceTable";
- Operation operation = parseAndConvert(sql);
-
- assertThat(
- operation,
- isCreateTableOperation(
- withSchema(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .build()),
- withOptions(
- entry("connector.type", "kafka"), entry("format.type", "json"))));
- }
-
- @Test
- public void testMergingCreateTableLike() {
- Map<String, String> sourceProperties = new HashMap<>();
- sourceProperties.put("format.type", "json");
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .columnByExpression("f2", "`f0` + 12345")
- .watermark("f1", "`f1` - interval '1' second")
- .build(),
- null,
- Arrays.asList("f0", "f1"),
- sourceProperties);
-
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
- final String sql =
- "create table derivedTable(\n"
- + " a int,\n"
- + " watermark for f1 as `f1` - interval '5' second\n"
- + ")\n"
- + "PARTITIONED BY (a, f0)\n"
- + "with (\n"
- + " 'connector.type' = 'kafka'"
- + ")\n"
- + "like sourceTable (\n"
- + " EXCLUDING GENERATED\n"
- + " EXCLUDING PARTITIONS\n"
- + " OVERWRITING OPTIONS\n"
- + " OVERWRITING WATERMARKS"
- + ")";
- Operation operation = parseAndConvert(sql);
-
- assertThat(
- operation,
- isCreateTableOperation(
- withSchema(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column("f1", DataTypes.TIMESTAMP(3))
- .column("a", DataTypes.INT())
- .watermark("f1", "`f1` - INTERVAL '5' SECOND")
- .build()),
- withOptions(entry("connector.type", "kafka"), entry("format.type", "json")),
- partitionedBy("a", "f0")));
- }
-
- @Test
- public void testCreateTableInvalidPartition() {
- final String sql =
- "create table derivedTable(\n" + " a int\n" + ")\n" + "PARTITIONED BY (f3)";
-
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Partition column 'f3' not defined in the table schema. Available columns: ['a']");
- parseAndConvert(sql);
- }
-
- @Test
- public void testCreateTableLikeInvalidPartition() {
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
- null,
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
- final String sql =
- "create table derivedTable(\n"
- + " a int\n"
- + ")\n"
- + "PARTITIONED BY (f3)\n"
- + "like sourceTable";
-
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Partition column 'f3' not defined in the table schema. Available columns: ['f0', 'a']");
- parseAndConvert(sql);
- }
-
- @Test
- public void testCreateTableInvalidWatermark() {
- final String sql =
- "create table derivedTable(\n"
- + " a int,\n"
- + " watermark for f1 as `f1` - interval '5' second\n"
- + ")";
-
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "The rowtime attribute field 'f1' is not defined in the table schema,"
- + " at line 3, column 17\n"
- + "Available fields: ['a']");
- parseAndConvert(sql);
- }
-
- @Test
- public void testCreateTableLikeInvalidWatermark() {
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder().column("f0", DataTypes.INT().notNull()).build(),
- null,
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
- final String sql =
- "create table derivedTable(\n"
- + " a int,\n"
- + " watermark for f1 as `f1` - interval '5' second\n"
- + ")\n"
- + "like sourceTable";
-
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "The rowtime attribute field 'f1' is not defined in the table schema,"
- + " at line 3, column 17\n"
- + "Available fields: ['f0', 'a']");
- parseAndConvert(sql);
- }
-
- @Test
- public void testCreateTableLikeNestedWatermark() {
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("f0", DataTypes.INT().notNull())
- .column(
- "f1",
- DataTypes.ROW(
- DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
- .build(),
- null,
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceTable"), false);
-
- final String sql =
- "create table derivedTable(\n"
- + " a int,\n"
- + " watermark for f1.t as f1.t - interval '5' second\n"
- + ")\n"
- + "like sourceTable";
-
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "The rowtime attribute field 'f1.t' is not defined in the table schema,"
- + " at line 3, column 20\n"
- + "Nested field 't' was not found in a composite type:"
- + " ROW<`tmstmp` TIMESTAMP(3)>.");
- parseAndConvert(sql);
- }
-
- @Test
- public void testSqlInsertWithStaticPartition() {
- final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- Operation operation = parse(sql, planner, parser);
- assert operation instanceof CatalogSinkModifyOperation;
- CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
- final Map<String, String> expectedStaticPartitions = new HashMap<>();
- expectedStaticPartitions.put("a", "1");
- assertEquals(expectedStaticPartitions, sinkModifyOperation.getStaticPartitions());
- }
-
- @Test
- public void testSqlInsertWithDynamicTableOptions() {
- final String sql =
- "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
- + "select a, b, c, d from t2";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- Operation operation = parse(sql, planner, parser);
- assert operation instanceof CatalogSinkModifyOperation;
- CatalogSinkModifyOperation sinkModifyOperation = (CatalogSinkModifyOperation) operation;
- Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
- assertNotNull(dynamicOptions);
- assertThat(dynamicOptions.size(), is(2));
- assertThat(dynamicOptions.toString(), is("{k1=v1, k2=v2}"));
- }
-
- @Test
- public void testDynamicTableWithInvalidOptions() {
- final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- thrown.expect(AssertionError.class);
- thrown.expectMessage("Hint [OPTIONS] only support " + "non empty key value options");
- parse(sql, planner, parser);
- }
-
- @Test // TODO: tweak the tests when FLINK-13604 is fixed.
- public void testCreateTableWithFullDataTypes() {
- final List<TestItem> testItems =
- Arrays.asList(
- createTestItem("CHAR", DataTypes.CHAR(1)),
- createTestItem("CHAR NOT NULL", DataTypes.CHAR(1).notNull()),
- createTestItem("CHAR NULL", DataTypes.CHAR(1)),
- createTestItem("CHAR(33)", DataTypes.CHAR(33)),
- createTestItem("VARCHAR", DataTypes.STRING()),
- createTestItem("VARCHAR(33)", DataTypes.VARCHAR(33)),
- createTestItem("STRING", DataTypes.STRING()),
- createTestItem("BOOLEAN", DataTypes.BOOLEAN()),
- createTestItem("BINARY", DataTypes.BINARY(1)),
- createTestItem("BINARY(33)", DataTypes.BINARY(33)),
- createTestItem("VARBINARY", DataTypes.BYTES()),
- createTestItem("VARBINARY(33)", DataTypes.VARBINARY(33)),
- createTestItem("BYTES", DataTypes.BYTES()),
- createTestItem("DECIMAL", DataTypes.DECIMAL(10, 0)),
- createTestItem("DEC", DataTypes.DECIMAL(10, 0)),
- createTestItem("NUMERIC", DataTypes.DECIMAL(10, 0)),
- createTestItem("DECIMAL(10)", DataTypes.DECIMAL(10, 0)),
- createTestItem("DEC(10)", DataTypes.DECIMAL(10, 0)),
- createTestItem("NUMERIC(10)", DataTypes.DECIMAL(10, 0)),
- createTestItem("DECIMAL(10, 3)", DataTypes.DECIMAL(10, 3)),
- createTestItem("DEC(10, 3)", DataTypes.DECIMAL(10, 3)),
- createTestItem("NUMERIC(10, 3)", DataTypes.DECIMAL(10, 3)),
- createTestItem("TINYINT", DataTypes.TINYINT()),
- createTestItem("SMALLINT", DataTypes.SMALLINT()),
- createTestItem("INTEGER", DataTypes.INT()),
- createTestItem("INT", DataTypes.INT()),
- createTestItem("BIGINT", DataTypes.BIGINT()),
- createTestItem("FLOAT", DataTypes.FLOAT()),
- createTestItem("DOUBLE", DataTypes.DOUBLE()),
- createTestItem("DOUBLE PRECISION", DataTypes.DOUBLE()),
- createTestItem("DATE", DataTypes.DATE()),
- createTestItem("TIME", DataTypes.TIME()),
- createTestItem("TIME WITHOUT TIME ZONE", DataTypes.TIME()),
- // Expect to be TIME(3).
- createTestItem("TIME(3)", DataTypes.TIME()),
- // Expect to be TIME(3).
- createTestItem("TIME(3) WITHOUT TIME ZONE", DataTypes.TIME()),
- createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)),
- createTestItem("TIMESTAMP WITHOUT TIME ZONE", DataTypes.TIMESTAMP(6)),
- createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
- createTestItem("TIMESTAMP(3) WITHOUT TIME ZONE", DataTypes.TIMESTAMP(3)),
- createTestItem(
- "TIMESTAMP WITH LOCAL TIME ZONE",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)),
- createTestItem(
- "TIMESTAMP(3) WITH LOCAL TIME ZONE",
- DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
- createTestItem(
- "ARRAY<TIMESTAMP(3) WITH LOCAL TIME ZONE>",
- DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))),
- createTestItem(
- "ARRAY<INT NOT NULL>", DataTypes.ARRAY(DataTypes.INT().notNull())),
- createTestItem("INT ARRAY", DataTypes.ARRAY(DataTypes.INT())),
- createTestItem(
- "INT NOT NULL ARRAY", DataTypes.ARRAY(DataTypes.INT().notNull())),
- createTestItem(
- "INT ARRAY NOT NULL", DataTypes.ARRAY(DataTypes.INT()).notNull()),
- createTestItem(
- "MULTISET<INT NOT NULL>",
- DataTypes.MULTISET(DataTypes.INT().notNull())),
- createTestItem("INT MULTISET", DataTypes.MULTISET(DataTypes.INT())),
- createTestItem(
- "INT NOT NULL MULTISET",
- DataTypes.MULTISET(DataTypes.INT().notNull())),
- createTestItem(
- "INT MULTISET NOT NULL",
- DataTypes.MULTISET(DataTypes.INT()).notNull()),
- createTestItem(
- "MAP<BIGINT, BOOLEAN>",
- DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())),
- // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
- createTestItem(
- "ROW<f0 INT NOT NULL, f1 BOOLEAN>",
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
- // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
- createTestItem(
- "ROW(f0 INT NOT NULL, f1 BOOLEAN)",
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
- createTestItem(
- "ROW<`f0` INT>",
- DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
- createTestItem(
- "ROW(`f0` INT)",
- DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))),
- createTestItem("ROW<>", DataTypes.ROW()),
- createTestItem("ROW()", DataTypes.ROW()),
- // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>.
- createTestItem(
- "ROW<f0 INT NOT NULL 'This is a comment.',"
- + " f1 BOOLEAN 'This as well.'>",
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN()))),
- createTestItem(
- "ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
- DataTypes.ARRAY(
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
- createTestItem(
- "ROW<f0 INT, f1 BOOLEAN> MULTISET",
- DataTypes.MULTISET(
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
- createTestItem(
- "MULTISET<ROW<f0 INT, f1 BOOLEAN>>",
- DataTypes.MULTISET(
- DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
- DataTypes.FIELD("f1", DataTypes.BOOLEAN())))),
- createTestItem(
- "ROW<f0 Row<f00 INT, f01 BOOLEAN>, "
- + "f1 INT ARRAY, "
- + "f2 BOOLEAN MULTISET>",
- DataTypes.ROW(
- DataTypes.FIELD(
- "f0",
- DataTypes.ROW(
- DataTypes.FIELD("f00", DataTypes.INT()),
- DataTypes.FIELD(
- "f01", DataTypes.BOOLEAN()))),
- DataTypes.FIELD("f1", DataTypes.ARRAY(DataTypes.INT())),
- DataTypes.FIELD(
- "f2", DataTypes.MULTISET(DataTypes.BOOLEAN())))));
- StringBuilder buffer = new StringBuilder("create table t1(\n");
- for (int i = 0; i < testItems.size(); i++) {
- buffer.append("f").append(i).append(" ").append(testItems.get(i).testExpr);
- if (i == testItems.size() - 1) {
- buffer.append(")");
- } else {
- buffer.append(",\n");
- }
- }
- final String sql = buffer.toString();
- final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
- SqlNode node = parser.parse(sql);
- assert node instanceof SqlCreateTable;
- Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
- TableSchema schema = ((CreateTableOperation) operation).getCatalogTable().getSchema();
- Object[] expectedDataTypes = testItems.stream().map(item -> item.expectedType).toArray();
- assertArrayEquals(expectedDataTypes, schema.getFieldDataTypes());
- }
-
- @Test
- public void testCreateTableWithComputedColumn() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a int,\n"
- + " b varchar, \n"
- + " c as a - 1, \n"
- + " d as b || '$$', \n"
- + " e as my_udf1(a),"
- + " f as `default`.my_udf2(a) + 1,"
- + " g as builtin.`default`.my_udf3(a) || '##'\n"
- + ")\n"
- + " with (\n"
- + " 'connector' = 'kafka', \n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
- functionCatalog.registerTempCatalogScalarFunction(
- ObjectIdentifier.of("builtin", "default", "my_udf1"), Func0$.MODULE$);
- functionCatalog.registerTempCatalogScalarFunction(
- ObjectIdentifier.of("builtin", "default", "my_udf2"), Func1$.MODULE$);
- functionCatalog.registerTempCatalogScalarFunction(
- ObjectIdentifier.of("builtin", "default", "my_udf3"), Func8$.MODULE$);
- FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
- assert operation instanceof CreateTableOperation;
- CreateTableOperation op = (CreateTableOperation) operation;
- CatalogTable catalogTable = op.getCatalogTable();
- assertArrayEquals(
- new String[] {"a", "b", "c", "d", "e", "f", "g"},
- catalogTable.getSchema().getFieldNames());
- assertArrayEquals(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.STRING(),
- DataTypes.INT().notNull(),
- DataTypes.INT(),
- DataTypes.STRING()
- },
- catalogTable.getSchema().getFieldDataTypes());
- String[] columnExpressions =
- catalogTable.getSchema().getTableColumns().stream()
- .filter(ComputedColumn.class::isInstance)
- .map(ComputedColumn.class::cast)
- .map(ComputedColumn::getExpression)
- .toArray(String[]::new);
- String[] expected =
- new String[] {
- "`a` - 1",
- "`b` || '$$'",
- "`builtin`.`default`.`my_udf1`(`a`)",
- "`builtin`.`default`.`my_udf2`(`a`) + 1",
- "`builtin`.`default`.`my_udf3`(`a`) || '##'"
- };
- assertArrayEquals(expected, columnExpressions);
- }
-
- @Test
- public void testCreateTableWithMetadataColumn() {
- final String sql =
- "CREATE TABLE tbl1 (\n"
- + " a INT,\n"
- + " b STRING,\n"
- + " c INT METADATA,\n"
- + " d INT METADATA FROM 'other.key',\n"
- + " e INT METADATA VIRTUAL\n"
- + ")\n"
- + " WITH (\n"
- + " 'connector' = 'kafka',\n"
- + " 'kafka.topic' = 'log.test'\n"
- + ")\n";
-
- final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final Operation operation = parse(sql, planner, getParserBySqlDialect(SqlDialect.DEFAULT));
- assert operation instanceof CreateTableOperation;
- final CreateTableOperation op = (CreateTableOperation) operation;
- final TableSchema actualSchema = op.getCatalogTable().getSchema();
-
- final TableSchema expectedSchema =
- TableSchema.builder()
- .add(TableColumn.physical("a", DataTypes.INT()))
- .add(TableColumn.physical("b", DataTypes.STRING()))
- .add(TableColumn.metadata("c", DataTypes.INT()))
- .add(TableColumn.metadata("d", DataTypes.INT(), "other.key"))
- .add(TableColumn.metadata("e", DataTypes.INT(), true))
- .build();
-
- assertEquals(expectedSchema, actualSchema);
- }
-
- @Test
- public void testAlterTable() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder().column("a", DataTypes.STRING()).build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- final String[] renameTableSqls =
- new String[] {
- "alter table cat1.db1.tb1 rename to tb2",
- "alter table db1.tb1 rename to tb2",
- "alter table tb1 rename to cat1.db1.tb2",
- };
- final ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
- final ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of("cat1", "db1", "tb2");
- // test rename table converter
- for (int i = 0; i < renameTableSqls.length; i++) {
- Operation operation = parse(renameTableSqls[i], SqlDialect.DEFAULT);
- assert operation instanceof AlterTableRenameOperation;
- final AlterTableRenameOperation alterTableRenameOperation =
- (AlterTableRenameOperation) operation;
- assertEquals(expectedIdentifier, alterTableRenameOperation.getTableIdentifier());
- assertEquals(expectedNewIdentifier, alterTableRenameOperation.getNewTableIdentifier());
- }
- // test alter table options
- Operation operation =
- parse(
- "alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')",
- SqlDialect.DEFAULT);
- assert operation instanceof AlterTableOptionsOperation;
- final AlterTableOptionsOperation alterTableOptionsOperation =
- (AlterTableOptionsOperation) operation;
- assertEquals(expectedIdentifier, alterTableOptionsOperation.getTableIdentifier());
- assertEquals(2, alterTableOptionsOperation.getCatalogTable().getOptions().size());
- Map<String, String> options = new HashMap<>();
- options.put("k1", "v1");
- options.put("K2", "V2");
- assertEquals(options, alterTableOptionsOperation.getCatalogTable().getOptions());
- }
-
- @Test
- public void testAlterTableAddPkConstraint() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.BIGINT())
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter add table constraint.
- Operation operation =
- parse(
- "alter table tb1 add constraint ct1 primary key(a, b) not enforced",
- SqlDialect.DEFAULT);
- assert operation instanceof AlterTableAddConstraintOperation;
- AlterTableAddConstraintOperation addConstraintOperation =
- (AlterTableAddConstraintOperation) operation;
- assertThat(
- addConstraintOperation.asSummaryString(),
- is(
- "ALTER TABLE ADD CONSTRAINT: (identifier: [`cat1`.`db1`.`tb1`], "
- + "constraintName: [ct1], columns: [a, b])"));
- // Test alter table add pk on nullable column
- thrown.expect(ValidationException.class);
- thrown.expectMessage("Could not create a PRIMARY KEY 'ct1'. Column 'c' is nullable.");
- parse("alter table tb1 add constraint ct1 primary key(c) not enforced", SqlDialect.DEFAULT);
- }
-
- @Test
- public void testAlterTableAddPkConstraintEnforced() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.BIGINT())
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter table add enforced
- thrown.expect(ValidationException.class);
- thrown.expectMessage(
- "Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. "
- + "ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the "
- + "incoming/outgoing data. Flink does not own the data therefore the "
- + "only supported mode is the NOT ENFORCED mode");
- parse("alter table tb1 add constraint ct1 primary key(a, b)", SqlDialect.DEFAULT);
- }
-
- @Test
- public void testAlterTableAddUniqueConstraint() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter add table constraint.
- thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("UNIQUE constraint is not supported yet");
- parse("alter table tb1 add constraint ct1 unique(a, b) not enforced", SqlDialect.DEFAULT);
- }
-
- @Test
- public void testAlterTableRenameColumn() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.TIMESTAMP(3).notNull())
- .watermark("c", "c - INTERVAL '5' SECOND")
- .primaryKey("b")
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter table rename
- Operation operation = parse("alter table tb1 rename c to c1", SqlDialect.DEFAULT);
- assert operation instanceof AlterTableSchemaOperation;
- Schema unresolvedSchema =
- ((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema();
- String[] newColumns =
- unresolvedSchema.getColumns().stream()
- .map(Schema.UnresolvedColumn::getName)
- .toArray(String[]::new);
- // test rename column name
- assertArrayEquals(newColumns, new String[] {"a", "b", "c1"});
- // test watermark expression
- Schema.UnresolvedWatermarkSpec unresolvedWatermarkSpec =
- unresolvedSchema.getWatermarkSpecs().get(0);
- assertThat(unresolvedWatermarkSpec.getColumnName(), is("c1"));
-
- String expression =
- ((SqlCallExpression) unresolvedWatermarkSpec.getWatermarkExpression())
- .getSqlExpression();
- assertThat(expression, is("c1 - INTERVAL '5' SECOND"));
- }
-
- @Test
- public void testAlterTableAddUniqueConstraintEnforced() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.BIGINT())
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter table add enforced
- thrown.expect(UnsupportedOperationException.class);
- thrown.expectMessage("UNIQUE constraint is not supported yet");
- parse("alter table tb1 add constraint ct1 unique(a, b)", SqlDialect.DEFAULT);
- }
-
- @Test
- public void testAlterTableDropConstraint() throws Exception {
- Catalog catalog = new GenericInMemoryCatalog("default", "default");
- catalogManager.registerCatalog("cat1", catalog);
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), null), true);
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("a", DataTypes.STRING().notNull())
- .column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.BIGINT())
- .primaryKeyNamed("ct1", "a", "b")
- .build(),
- "tb1",
- Collections.emptyList(),
- Collections.emptyMap());
- catalogManager.setCurrentCatalog("cat1");
- catalogManager.setCurrentDatabase("db1");
- catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
- // Test alter table add enforced
- Operation operation = parse("alter table tb1 drop constraint ct1", SqlDialect.DEFAULT);
- assert operation instanceof AlterTableDropConstraintOperation;
- AlterTableDropConstraintOperation dropConstraint =
- (AlterTableDropConstraintOperation) operation;
- assertThat(
- dropConstraint.asSummaryString(),
- is("ALTER TABLE `cat1`.`db1`.`tb1` DROP CONSTRAINT ct1"));
- thrown.expect(ValidationException.class);
- thrown.expectMessage("CONSTRAINT [ct2] does not exist");
- parse("alter table tb1 drop constraint ct2", SqlDialect.DEFAULT);
- }
-
- @Test
- public void testCreateViewWithMatchRecognize() {
- Map<String, String> prop = new HashMap<>();
- prop.put("connector", "values");
- prop.put("bounded", "true");
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("id", DataTypes.INT().notNull())
- .column("measurement", DataTypes.BIGINT().notNull())
- .column(
- "ts",
- DataTypes.ROW(
- DataTypes.FIELD("tmstmp", DataTypes.TIMESTAMP(3))))
- .build(),
- null,
- Collections.emptyList(),
- prop);
-
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "events"), false);
-
- final String sql =
- ""
- + "CREATE TEMPORARY VIEW foo AS "
- + "SELECT * "
- + "FROM events MATCH_RECOGNIZE ("
- + " PARTITION BY id "
- + " ORDER BY ts ASC "
- + " MEASURES "
- + " next_step.measurement - this_step.measurement AS diff "
- + " AFTER MATCH SKIP TO NEXT ROW "
- + " PATTERN (this_step next_step)"
- + " DEFINE "
- + " this_step AS TRUE,"
- + " next_step AS TRUE"
- + ")";
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assertThat(operation, instanceOf(CreateViewOperation.class));
- }
-
- @Test
- public void testCreateViewWithDynamicTableOptions() {
- tableConfig
- .getConfiguration()
- .setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true);
- Map<String, String> prop = new HashMap<>();
- prop.put("connector", "values");
- prop.put("bounded", "true");
- CatalogTable catalogTable =
- CatalogTable.of(
- Schema.newBuilder()
- .column("f0", DataTypes.INT())
- .column("f1", DataTypes.VARCHAR(20))
- .build(),
- null,
- Collections.emptyList(),
- prop);
-
- catalogManager.createTable(
- catalogTable, ObjectIdentifier.of("builtin", "default", "sourceA"), false);
-
- final String sql =
- ""
- + "create view test_view as\n"
- + "select *\n"
- + "from sourceA /*+ OPTIONS('changelog-mode'='I') */";
-
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assertThat(operation, instanceOf(CreateViewOperation.class));
- }
-
- @Test
- public void testBeginStatementSet() {
- final String sql = "BEGIN STATEMENT SET";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof BeginStatementSetOperation;
- final BeginStatementSetOperation beginStatementSetOperation =
- (BeginStatementSetOperation) operation;
-
- assertEquals("BEGIN STATEMENT SET", beginStatementSetOperation.asSummaryString());
- }
-
- @Test
- public void testEnd() {
- final String sql = "END";
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof EndStatementSetOperation;
- final EndStatementSetOperation endStatementSetOperation =
- (EndStatementSetOperation) operation;
-
- assertEquals("END", endStatementSetOperation.asSummaryString());
- }
-
- // ~ Tool Methods ----------------------------------------------------------
-
- private static TestItem createTestItem(Object... args) {
- assert args.length == 2;
- final String testExpr = (String) args[0];
- TestItem testItem = TestItem.fromTestExpr(testExpr);
- if (args[1] instanceof String) {
- testItem.withExpectedError((String) args[1]);
- } else {
- testItem.withExpectedType(args[1]);
- }
- return testItem;
- }
-
- private void assertShowFunctions(
- String sql, String expectedSummary, FunctionScope expectedScope) {
- Operation operation = parse(sql, SqlDialect.DEFAULT);
- assert operation instanceof ShowFunctionsOperation;
- final ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
-
- assertEquals(expectedScope, showFunctionsOperation.getFunctionScope());
- assertEquals(expectedSummary, showFunctionsOperation.asSummaryString());
- }
-
- private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
- SqlNode node = parser.parse(sql);
- return SqlToOperationConverter.convert(planner, catalogManager, node).get();
- }
-
- private Operation parse(String sql, SqlDialect sqlDialect) {
- FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect);
- final CalciteParser parser = getParserBySqlDialect(sqlDialect);
- SqlNode node = parser.parse(sql);
- return SqlToOperationConverter.convert(planner, catalogManager, node).get();
- }
-
- private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
- tableConfig.setSqlDialect(sqlDialect);
- return plannerContext.createFlinkPlanner(
- catalogManager.getCurrentCatalog(), catalogManager.getCurrentDatabase());
- }
-
- private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
- tableConfig.setSqlDialect(sqlDialect);
- return plannerContext.createCalciteParser();
- }
-
- // ~ Inner Classes ----------------------------------------------------------
-
- private static class TestItem {
- private final String testExpr;
- @Nullable private Object expectedType;
- @Nullable private String expectedError;
-
- private TestItem(String testExpr) {
- this.testExpr = testExpr;
- }
-
- static TestItem fromTestExpr(String testExpr) {
- return new TestItem(testExpr);
- }
-
- TestItem withExpectedType(Object expectedType) {
- this.expectedType = expectedType;
- return this;
- }
-
- TestItem withExpectedError(String expectedError) {
- this.expectedError = expectedError;
- return this;
- }
-
- @Override
- public String toString() {
- return this.testExpr;
- }
- }
-
- private Operation parseAndConvert(String sql) {
- final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
- final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-
- SqlNode node = parser.parse(sql);
- return SqlToOperationConverter.convert(planner, catalogManager, node).get();
- }
-}
\ No newline at end of file
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
new file mode 100644
index 00000000000..7b22246de8d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.LocalReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.rex.RexInputRef;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A finder used to look up referenced column name in a {@link ResolvedExpression}. */
+public class ColumnReferenceFinder {
+
+ private ColumnReferenceFinder() {}
+
+ public static Set<String> findReferencedColumn(
+ ResolvedExpression resolvedExpression, List<String> tableColumns) {
+ ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns);
+ visitor.visit(resolvedExpression);
+ return visitor.referencedColumns;
+ }
+
+ private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> {
+ private final List<String> tableColumns;
+ private final Set<String> referencedColumns;
+
+ public ColumnReferenceVisitor(List<String> tableColumns) {
+ this.tableColumns = tableColumns;
+ this.referencedColumns = new HashSet<>();
+ }
+
+ @Override
+ public Void visit(Expression expression) {
+ if (expression instanceof LocalReferenceExpression) {
+ return visit((LocalReferenceExpression) expression);
+ } else if (expression instanceof FieldReferenceExpression) {
+ return visit((FieldReferenceExpression) expression);
+ } else if (expression instanceof RexNodeExpression) {
+ return visit((RexNodeExpression) expression);
+ } else if (expression instanceof CallExpression) {
+ return visit((CallExpression) expression);
+ } else {
+ return super.visit(expression);
+ }
+ }
+
+ @Override
+ public Void visit(FieldReferenceExpression fieldReference) {
+ referencedColumns.add(fieldReference.getName());
+ return null;
+ }
+
+ public Void visit(LocalReferenceExpression localReference) {
+ referencedColumns.add(localReference.getName());
+ return null;
+ }
+
+ public Void visit(RexNodeExpression rexNode) {
+ // get the referenced column ref in table
+ Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode());
+ // get the referenced column name by index
+ inputRefs.forEach(
+ inputRef -> {
+ int index = inputRef.getIndex();
+ referencedColumns.add(tableColumns.get(index));
+ });
+ return null;
+ }
+
+ @Override
+ public Void visit(CallExpression call) {
+ for (Expression expression : call.getChildren()) {
+ visit(expression);
+ }
+ return null;
+ }
+
+ @Override
+ protected Void defaultMethod(Expression expression) {
+ throw new TableException("Unexpected expression: " + expression);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 24acefd2930..db4cfc1e9cf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.operations;
import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd;
import org.apache.flink.sql.parser.ddl.SqlAlterTableModify;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlWatermark;
@@ -27,9 +28,15 @@ import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.expressions.ColumnReferenceFinder;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -45,6 +52,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -64,6 +72,8 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDa
*/
public class AlterSchemaConverter {
+ private static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+
private final SqlValidator sqlValidator;
private final Function<SqlNode, String> escapeExpression;
private final Consumer<SqlTableConstraint> constraintValidator;
@@ -104,9 +114,83 @@ public class AlterSchemaConverter {
return converter.convert();
}
- private abstract static class SchemaConverter {
+ public Schema applySchemaChange(
+ SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) {
+ String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier());
+ String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier());
+ List<String> tableColumns =
+ originalTable.getResolvedSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+ // validate old column is exists or new column isn't duplicated or old column isn't
+ // referenced by computed column
+ validateColumnName(
+ oldColumnName,
+ newColumnName,
+ tableColumns,
+ originalTable.getResolvedSchema(),
+ ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys());
+
+ // validate old column isn't referenced by watermark
+ List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs();
+ watermarkSpecs.forEach(
+ watermarkSpec -> {
+ String rowtimeAttribute = watermarkSpec.getRowtimeAttribute();
+ Set<String> referencedColumns =
+ ColumnReferenceFinder.findReferencedColumn(
+ watermarkSpec.getWatermarkExpression(), tableColumns);
+ if (oldColumnName.equals(rowtimeAttribute)
+ || referencedColumns.contains(oldColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referred by watermark expression %s, "
+ + "currently doesn't allow to rename column which is "
+ + "referred by watermark expression.",
+ oldColumnName, watermarkSpec.asSummaryString()));
+ }
+ });
+
+ Schema.Builder builder = Schema.newBuilder();
+ // build column
+ Schema originSchema = originalTable.getTable().getUnresolvedSchema();
+ originSchema
+ .getColumns()
+ .forEach(
+ column -> {
+ if (oldColumnName.equals(column.getName())) {
+ buildNewColumnFromOriginColumn(builder, column, newColumnName);
+ } else {
+ builder.fromColumns(Collections.singletonList(column));
+ }
+ });
+ // build primary key
+ Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey();
+ if (originPrimaryKey.isPresent()) {
+ List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames();
+ String constrainName = originPrimaryKey.get().getConstraintName();
+ List<String> newPrimaryKeyNames =
+ originPrimaryKeyNames.stream()
+ .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName)
+ .collect(Collectors.toList());
+ builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+ }
+
+ // build watermark
+ originSchema
+ .getWatermarkSpecs()
+ .forEach(
+ watermarkSpec ->
+ builder.watermark(
+ watermarkSpec.getColumnName(),
+ watermarkSpec.getWatermarkExpression()));
+
+ // generate new schema
+ return builder.build();
+ }
+
+ // --------------------------------------------------------------------------------------------
- static final String EX_MSG_PREFIX = "Failed to execute ALTER TABLE statement.\n";
+ private abstract static class SchemaConverter {
List<String> sortedColumnNames = new ArrayList<>();
Set<String> alterColNames = new HashSet<>();
@@ -278,13 +362,7 @@ public class AlterSchemaConverter {
for (SqlNode alterColumn : alterColumns) {
SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn;
SqlTableColumn column = columnPosition.getColumn();
- if (!column.getName().isSimple()) {
- throw new UnsupportedOperationException(
- String.format(
- "%sAlter nested row type is not supported yet.",
- EX_MSG_PREFIX));
- }
- String columnName = column.getName().getSimple();
+ String columnName = getColumnName(column.getName());
if (!alterColNames.add(columnName)) {
throw new ValidationException(
String.format(
@@ -493,6 +571,87 @@ public class AlterSchemaConverter {
}
}
+ // --------------------------------------------------------------------------------------------
+
+ private void validateColumnName(
+ String originColumnName,
+ String newColumnName,
+ List<String> tableColumns,
+ ResolvedSchema originResolvedSchema,
+ List<String> partitionKeys) {
+ // validate old column
+ if (!tableColumns.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s not found in table schema for RENAME COLUMN",
+ originColumnName));
+ }
+
+ // validate new column
+ if (tableColumns.contains(newColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "New column %s already existed in table schema for RENAME COLUMN",
+ newColumnName));
+ }
+
+ // validate old column name isn't referred by computed column case
+ originResolvedSchema.getColumns().stream()
+ .filter(column -> column instanceof Column.ComputedColumn)
+ .forEach(
+ column -> {
+ Column.ComputedColumn computedColumn = (Column.ComputedColumn) column;
+ Set<String> referencedColumn =
+ ColumnReferenceFinder.findReferencedColumn(
+ computedColumn.getExpression(), tableColumns);
+ if (referencedColumn.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referred by computed column %s, currently doesn't "
+ + "allow to rename column which is referred by computed column.",
+ originColumnName,
+ computedColumn.asSummaryString()));
+ }
+ });
+ // validate partition keys doesn't contain the old column
+ if (partitionKeys.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Can not rename column %s because it is used as the partition keys.",
+ originColumnName));
+ }
+ }
+
+ private void buildNewColumnFromOriginColumn(
+ Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
+ if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+ builder.columnByExpression(
+ columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
+ } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+ builder.column(
+ columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
+ } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+ Schema.UnresolvedMetadataColumn metadataColumn =
+ (Schema.UnresolvedMetadataColumn) originColumn;
+ builder.columnByMetadata(
+ columnName,
+ metadataColumn.getDataType(),
+ metadataColumn.getMetadataKey(),
+ metadataColumn.isVirtual());
+ }
+ originColumn.getComment().ifPresent(builder::withComment);
+ }
+
+ private static String getColumnName(SqlIdentifier identifier) {
+ if (!identifier.isSimple()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%sAlter nested row type %s is not supported yet.",
+ EX_MSG_PREFIX, identifier));
+ }
+ return identifier.getSimple();
+ }
+
private AlterSchemaStrategy computeAlterSchemaStrategy(SqlAlterTableSchema alterTableSchema) {
if (alterTableSchema instanceof SqlAlterTableAdd) {
return AlterSchemaStrategy.ADD;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 237977c049c..3b4b196e991 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -28,6 +28,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact;
import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint;
import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn;
import org.apache.flink.sql.parser.ddl.SqlAlterTableReset;
import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
import org.apache.flink.sql.parser.ddl.SqlAlterView;
@@ -531,6 +532,20 @@ public class SqlToOperationConverter {
(SqlChangeColumn) sqlAlterTable,
(CatalogTable) baseTable,
flinkPlanner.getOrCreateSqlValidator());
+ } else if (sqlAlterTable instanceof SqlAlterTableRenameColumn) {
+ SqlAlterTableRenameColumn sqlAlterTableRenameColumn =
+ (SqlAlterTableRenameColumn) sqlAlterTable;
+ Schema newSchema =
+ alterSchemaConverter.applySchemaChange(
+ sqlAlterTableRenameColumn, optionalCatalogTable.get());
+ CatalogTable baseCatalogTable = (CatalogTable) baseTable;
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ newSchema,
+ baseCatalogTable.getComment(),
+ baseCatalogTable.getPartitionKeys(),
+ baseCatalogTable.getOptions()));
} else if (sqlAlterTable instanceof SqlAddPartitions) {
List<CatalogPartitionSpec> specs = new ArrayList<>();
List<CatalogPartition> partitions = new ArrayList<>();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 1f79acbb0b3..6b663e70ff9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
@@ -32,8 +31,6 @@ import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.SqlCallExpression;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -41,12 +38,10 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.TypeConversions;
-import com.google.common.collect.Lists;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.validate.SqlValidator;
-import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.HashMap;
@@ -67,12 +62,10 @@ public class OperationConverterUtils {
CatalogTable catalogTable,
SqlValidator sqlValidator) {
// This is only used by the Hive dialect at the moment. In Hive, only non-partition columns
- // can be
- // added/replaced and users will only define non-partition columns in the new column list.
- // Therefore, we require
- // that partitions columns must appear last in the schema (which is inline with Hive).
- // Otherwise, we won't be
- // able to determine the column positions after the non-partition columns are replaced.
+ // can be added/replaced and users will only define non-partition columns in the new column
+ // list. Therefore, we require that partitions columns must appear last in the schema (which
+ // is inline with Hive). Otherwise, we won't be able to determine the column positions after
+ // the non-partition columns are replaced.
TableSchema oldSchema = catalogTable.getSchema();
int numPartCol = catalogTable.getPartitionKeys().size();
Set<String> lastCols =
@@ -149,116 +142,6 @@ public class OperationConverterUtils {
// TODO: handle watermark and constraints
}
- public static Operation convertRenameColumn(
- ObjectIdentifier tableIdentifier,
- String originColumnName,
- String newColumnName,
- CatalogTable catalogTable) {
-
- Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
- validateColumnName(originColumnName, newColumnName, modifiedTableSchema);
-
- Schema.Builder builder = Schema.newBuilder();
- // build column
- modifiedTableSchema.getColumns().stream()
- .forEach(
- column -> {
- if (StringUtils.equals(column.getName(), originColumnName)) {
- buildNewColumnFromOriginColumn(builder, column, newColumnName);
- } else {
- buildNewColumnFromOriginColumn(builder, column, column.getName());
- }
- });
- // build primary key column
- List<String> originPrimaryKeyNames =
- modifiedTableSchema
- .getPrimaryKey()
- .map(Schema.UnresolvedPrimaryKey::getColumnNames)
- .orElseGet(Lists::newArrayList);
-
- List<String> newPrimaryKeyNames =
- originPrimaryKeyNames.stream()
- .map(
- pkName ->
- StringUtils.equals(pkName, originColumnName)
- ? newColumnName
- : pkName)
- .collect(Collectors.toList());
-
- if (newPrimaryKeyNames.size() > 0) {
- builder.primaryKey(newPrimaryKeyNames);
- }
- // build watermark
- modifiedTableSchema.getWatermarkSpecs().stream()
- .forEach(
- watermarkSpec -> {
- String watermarkRefColumnName = watermarkSpec.getColumnName();
- Expression watermarkExpression = watermarkSpec.getWatermarkExpression();
- if (StringUtils.equals(watermarkRefColumnName, originColumnName)) {
- String newWatermarkExpression =
- ((SqlCallExpression) watermarkExpression)
- .getSqlExpression()
- .replace(watermarkRefColumnName, newColumnName);
- builder.watermark(newColumnName, newWatermarkExpression);
- } else {
- builder.watermark(watermarkRefColumnName, watermarkExpression);
- }
- });
- // build partition key
- List<String> newPartitionKeys =
- catalogTable.getPartitionKeys().stream()
- .map(
- name ->
- StringUtils.equals(name, originColumnName)
- ? newColumnName
- : name)
- .collect(Collectors.toList());
- // generate new schema
- Schema newSchema = builder.build();
-
- return new AlterTableSchemaOperation(
- tableIdentifier,
- CatalogTable.of(
- newSchema,
- catalogTable.getComment(),
- newPartitionKeys,
- catalogTable.getOptions()));
- }
-
- private static void validateColumnName(
- String originColumnName, String newColumnName, Schema modifiedTableSchema) {
- List<String> columnNames =
- modifiedTableSchema.getColumns().stream()
- .map(Schema.UnresolvedColumn::getName)
- .collect(Collectors.toList());
-
- int originColumnIndex = columnNames.indexOf(originColumnName);
- if (originColumnIndex < 0) {
- throw new ValidationException(
- String.format("Old column %s not found for RENAME COLUMN ", originColumnName));
- }
-
- int sameColumnNameIndex = columnNames.indexOf(newColumnName);
- if (sameColumnNameIndex >= 0) {
- throw new ValidationException(
- String.format("New column %s existed for RENAME COLUMN ", newColumnName));
- }
- }
-
- private static void buildNewColumnFromOriginColumn(
- Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) {
- if (originColumn instanceof Schema.UnresolvedComputedColumn) {
- builder.columnByExpression(
- columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression());
- } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
- builder.column(
- columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType());
- } else {
- builder.columnByMetadata(
- columnName, ((Schema.UnresolvedMetadataColumn) originColumn).getDataType());
- }
- }
-
// change a column in the old table schema and return the updated table schema
public static TableSchema changeColumn(
TableSchema oldSchema,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index fc342e39b3b..81819208b9d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -313,7 +313,7 @@ object FlinkRexUtil {
* @return
* InputRef HashSet.
*/
- private[flink] def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
+ def findAllInputRefs(node: RexNode): util.HashSet[RexInputRef] = {
val set = new util.HashSet[RexInputRef]
node.accept(new RexVisitorImpl[Void](true) {
override def visitInputRef(inputRef: RexInputRef): Void = {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index c7ba3a5214c..1bbeb29d920 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -130,6 +130,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
import static org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
@@ -1268,6 +1269,121 @@ public class SqlToOperationConverterTest {
.hasMessageContaining("ALTER TABLE RESET does not support empty key");
}
+ @Test
+ public void testAlterTableRenameColumn() throws Exception {
+ prepareTable("tb1", false, false, true, 3);
+ // rename pk column c
+ Operation operation = parse("alter table tb1 rename c to c1");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+ .isEqualTo(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.BIGINT().notNull())
+ .column("c1", DataTypes.STRING().notNull())
+ .withComment("column comment")
+ .columnByExpression("d", "a*(b+2 + a*b)")
+ .column(
+ "e",
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.ROW(
+ DataTypes.DOUBLE(),
+ DataTypes.ARRAY(DataTypes.FLOAT()))))
+ .columnByExpression("f", "e.f1 + e.f2.f0")
+ .columnByMetadata("g", DataTypes.STRING(), null, true)
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .withComment("just a comment")
+ .watermark("ts", "ts - interval '5' seconds")
+ .primaryKeyNamed("ct1", "a", "b", "c1")
+ .build());
+
+ // rename computed column
+ operation = parse("alter table tb1 rename f to f1");
+ assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+ assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema())
+ .isEqualTo(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT().notNull())
+ .column("b", DataTypes.BIGINT().notNull())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
+ .columnByExpression("d", "a*(b+2 + a*b)")
+ .column(
+ "e",
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.ROW(
+ DataTypes.DOUBLE(),
+ DataTypes.ARRAY(DataTypes.FLOAT()))))
+ .columnByExpression("f1", "e.f1 + e.f2.f0")
+ .columnByMetadata("g", DataTypes.STRING(), null, true)
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .withComment("just a comment")
+ .watermark("ts", "ts - interval '5' seconds")
+ .primaryKeyNamed("ct1", "a", "b", "c")
+ .build());
+
+ // rename column c that is used in a computed column
+ assertThatThrownBy(() -> parse("alter table tb1 rename a to a1"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), "
+ + "currently doesn't allow to rename column which is referred by computed column.");
+
+ // rename column used in the watermark expression
+ assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, "
+ + "currently doesn't allow to rename column which is referred by watermark expression.");
+
+ // rename nested column
+ assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11"))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Alter nested row type e.f1 is not supported yet.");
+
+ // rename column with duplicate name
+ assertThatThrownBy(() -> parse("alter table tb1 rename c to a"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "New column a already existed in table schema for RENAME COLUMN");
+
+ // rename column e test computed column expression is ApiExpression which doesn't implement
+ // the equals method
+ CatalogTable catalogTable2 =
+ CatalogTable.of(
+ Schema.newBuilder()
+ .column("a", DataTypes.STRING().notNull())
+ .column("b", DataTypes.INT().notNull())
+ .column("e", DataTypes.STRING())
+ .columnByExpression("j", $("e").upperCase())
+ .columnByExpression("g", "TO_TIMESTAMP(e)")
+ .primaryKey("a", "b")
+ .build(),
+ "tb2",
+ Collections.singletonList("a"),
+ Collections.emptyMap());
+ catalogManager
+ .getCatalog("cat1")
+ .get()
+ .createTable(new ObjectPath("db1", "tb2"), catalogTable2, true);
+
+ assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't "
+ + "allow to rename column which is referred by computed column.");
+
+ // rename column used as partition key
+ assertThatThrownBy(() -> parse("alter table tb2 rename a to a1"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Can not rename column a because it is used as the partition keys");
+ }
+
@Test
public void testAlterTableDropConstraint() throws Exception {
prepareNonManagedTable(true);
@@ -1396,9 +1512,7 @@ public class SqlToOperationConverterTest {
// add an inner field to a nested row
assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
// refer to a nested inner field
assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)"))
@@ -1409,9 +1523,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
}
@Test
@@ -1430,7 +1542,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1464,7 +1576,7 @@ public class SqlToOperationConverterTest {
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
+ " `i` AS [`b` * 2],\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1483,7 +1595,8 @@ public class SqlToOperationConverterTest {
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
.columnByExpression("i", new SqlCallExpression("`b` * 2"))
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.column(
"e",
@@ -1514,7 +1627,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1649,7 +1762,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1671,7 +1784,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1696,7 +1809,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1720,7 +1833,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1799,7 +1912,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1820,7 +1933,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1849,7 +1962,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1879,7 +1992,7 @@ public class SqlToOperationConverterTest {
"ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
+ " `a` INT NOT NULL,\n"
+ " `b` BIGINT NOT NULL,\n"
- + " `c` STRING,\n"
+ + " `c` STRING NOT NULL COMMENT 'column comment',\n"
+ " `d` AS [a*(b+2 + a*b)],\n"
+ " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
+ " `f` AS [e.f1 + e.f2.f0],\n"
@@ -1970,9 +2083,7 @@ public class SqlToOperationConverterTest {
// modify an inner field to a nested row
assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
// refer to a nested inner field
assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)"))
@@ -1983,9 +2094,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining(
- "Failed to execute ALTER TABLE statement.\n"
- + "Alter nested row type is not supported yet.");
+ .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
}
@Test
@@ -2005,7 +2114,8 @@ public class SqlToOperationConverterTest {
.column("b", DataTypes.BIGINT().notNull())
.withComment("move b to first and add comment")
.column("a", DataTypes.INT().notNull())
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.column(
"e",
@@ -2030,7 +2140,8 @@ public class SqlToOperationConverterTest {
Schema.newBuilder()
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.column(
"e",
@@ -2063,6 +2174,7 @@ public class SqlToOperationConverterTest {
tableIdentifier,
Schema.newBuilder()
.column("c", DataTypes.BIGINT())
+ .withComment("column comment")
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
.columnByExpression("d", "`a` + 2")
@@ -2097,7 +2209,8 @@ public class SqlToOperationConverterTest {
.withComment("change g")
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.columnByMetadata("e", DataTypes.INT(), null, true)
.column("f", DataTypes.TIMESTAMP(3).notNull())
@@ -2183,6 +2296,7 @@ public class SqlToOperationConverterTest {
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
.column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.column(
"e",
@@ -2670,7 +2784,8 @@ public class SqlToOperationConverterTest {
Schema.newBuilder()
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
+ .withComment("column comment")
.columnByExpression("d", "a*(b+2 + a*b)")
.column(
"e",
@@ -2689,10 +2804,17 @@ public class SqlToOperationConverterTest {
if (!managedTable) {
options.put("connector", "dummy");
}
- if (numOfPkFields == 1) {
+ if (numOfPkFields == 0) {
+ // do nothing
+ } else if (numOfPkFields == 1) {
builder.primaryKeyNamed("ct1", "a");
} else if (numOfPkFields == 2) {
builder.primaryKeyNamed("ct1", "a", "b");
+ } else if (numOfPkFields == 3) {
+ builder.primaryKeyNamed("ct1", "a", "b", "c");
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Don't support to set pk with %s fields.", numOfPkFields));
}
if (hasWatermark) {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index d9a7e4cdd27..a78bb5a07cc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -50,7 +50,7 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.rules.{ExpectedException, TemporaryFolder}
import java.io.File
-import java.util.UUID
+import java.util.{Collections, UUID}
import scala.annotation.meta.getter
@@ -796,6 +796,28 @@ class TableEnvironmentTest {
checkData(expectedResult.iterator(), tableResult.collect())
}
+ @Test
+ def testAlterRenameColumn(): Unit = {
+ tableEnv.executeSql("""
+ |CREATE TABLE MyTable (
+ | a bigint
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'false'
+ |)
+ |""".stripMargin)
+ tableEnv.executeSql("""
+ |ALTER TABLE MyTable RENAME a TO b
+ |""".stripMargin)
+ val tableResult = tableEnv.executeSql("DESCRIBE MyTable")
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
+ checkData(
+ Collections
+ .singletonList(Row.of("b", "BIGINT", Boolean.box(true), null, null, null))
+ .iterator(),
+ tableResult.collect())
+ }
+
@Test
def testAlterTableCompactOnNonManagedTable(): Unit = {
val statement =