You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/15 09:56:49 UTC
[flink] branch master updated: [FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE
This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c26daffb904 [FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE
c26daffb904 is described below
commit c26daffb9041381f04a37f62280eab9731dad8f4
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Sun Jan 15 17:56:42 2023 +0800
[FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE
This closes #21640
---
.../hive/parse/HiveParserDDLSemanticAnalyzer.java | 14 +++---
.../src/test/resources/sql/table.q | 4 ++
.../sql/parser/hive/ddl/SqlAddHivePartitions.java | 2 +-
.../src/main/codegen/includes/parserImpls.ftl | 37 +++++++++-----
.../flink/sql/parser/ddl/SqlAddPartitions.java | 14 +++---
.../flink/sql/parser/ddl/SqlAddReplaceColumns.java | 2 +-
.../apache/flink/sql/parser/ddl/SqlAlterTable.java | 32 ++++++++++--
.../flink/sql/parser/ddl/SqlAlterTableAdd.java | 7 +--
.../sql/parser/ddl/SqlAlterTableAddConstraint.java | 12 +++--
.../flink/sql/parser/ddl/SqlAlterTableCompact.java | 16 ++++--
.../sql/parser/ddl/SqlAlterTableDropColumn.java | 9 ++--
.../parser/ddl/SqlAlterTableDropConstraint.java | 10 ++--
.../parser/ddl/SqlAlterTableDropPrimaryKey.java | 7 +--
.../sql/parser/ddl/SqlAlterTableDropWatermark.java | 7 +--
.../flink/sql/parser/ddl/SqlAlterTableModify.java | 7 +--
.../flink/sql/parser/ddl/SqlAlterTableOptions.java | 27 +++++++---
.../flink/sql/parser/ddl/SqlAlterTableRename.java | 14 ++++--
.../sql/parser/ddl/SqlAlterTableRenameColumn.java | 11 ++--
.../flink/sql/parser/ddl/SqlAlterTableReset.java | 13 +++--
.../flink/sql/parser/ddl/SqlAlterTableSchema.java | 5 +-
.../flink/sql/parser/ddl/SqlChangeColumn.java | 2 +-
.../flink/sql/parser/ddl/SqlDropPartitions.java | 2 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 58 +++++++++++++++++++++-
.../table/api/internal/TableEnvironmentImpl.java | 22 ++++----
.../operations/ddl/AddPartitionsOperation.java | 14 +++---
.../operations/ddl/AlterPartitionOperation.java | 2 +-
.../operations/ddl/AlterTableChangeOperation.java | 11 ++--
.../table/operations/ddl/AlterTableOperation.java | 9 +++-
.../operations/ddl/AlterTableOptionsOperation.java | 12 +++--
.../operations/ddl/AlterTableRenameOperation.java | 12 +++--
.../operations/ddl/AlterTableSchemaOperation.java | 13 +++--
.../operations/ddl/DropPartitionsOperation.java | 14 +++---
.../org/apache/flink/table/catalog/Catalog.java | 9 ++--
.../planner/operations/AlterSchemaConverter.java | 3 +-
.../operations/SqlToOperationConverter.java | 15 ++++--
.../planner/utils/OperationConverterUtils.java | 9 ++--
.../operations/SqlToOperationConverterTest.java | 49 ++++++++++++++----
37 files changed, 356 insertions(+), 150 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index fc7913c4efe..3787a20c644 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -1501,7 +1501,8 @@ public class HiveParserDDLSemanticAnalyzer {
newProps.entrySet().stream()
.map(entry -> TableChange.set(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()),
- oldTable.copy(props));
+ oldTable.copy(props),
+ false);
}
}
@@ -1879,7 +1880,7 @@ public class HiveParserDDLSemanticAnalyzer {
return expectView
? new AlterViewRenameOperation(objectIdentifier, parseObjectIdentifier(targetName))
: new AlterTableRenameOperation(
- objectIdentifier, parseObjectIdentifier(targetName));
+ objectIdentifier, parseObjectIdentifier(targetName), false);
}
private Operation convertAlterTableChangeCol(
@@ -1979,7 +1980,8 @@ public class HiveParserDDLSemanticAnalyzer {
tableIdentifier,
tableChanges,
new CatalogTableImpl(
- newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()));
+ newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()),
+ false);
}
private Operation convertAlterTableModifyCols(
@@ -2033,10 +2035,8 @@ public class HiveParserDDLSemanticAnalyzer {
return new AlterTableSchemaOperation(
tableIdentifier,
new CatalogTableImpl(
- builder.build(),
- oldTable.getPartitionKeys(),
- props,
- oldTable.getComment()));
+ builder.build(), oldTable.getPartitionKeys(), props, oldTable.getComment()),
+ false);
}
private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) {
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 1d1764ac5e0..4bc5b909693 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -44,6 +44,10 @@ alter table non_exist rename to non_exist2;
org.apache.flink.table.api.ValidationException: Table `default_catalog`.`default_database`.`non_exist` doesn't exist or is a temporary table.
!error
+alter table if exists non_exist rename to non_exist2;
+[INFO] Execute statement succeed.
+!info
+
# ==========================================================================
# test create table
# ==========================================================================
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
index 8bf84e8c77a..e0ae741c504 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -75,7 +75,7 @@ public class SqlAddHivePartitions extends SqlAddPartitions {
tableIdentifier.unparse(writer, leftPrec, rightPrec);
writer.newlineAndIndent();
writer.keyword("ADD");
- if (ifNotExists()) {
+ if (ifPartitionNotExists()) {
writer.keyword("IF NOT EXISTS");
}
int opLeftPrec = getOperator().getLeftPrec();
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 13b61a6937b..1c62bd776f1 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -590,6 +590,7 @@ SqlAlterTable SqlAlterTable() :
SqlParserPos startPos;
SqlIdentifier tableIdentifier;
SqlIdentifier newTableIdentifier = null;
+ boolean ifExists = false;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
SqlNodeList partitionSpec = null;
@@ -601,6 +602,7 @@ SqlAlterTable SqlAlterTable() :
}
{
<ALTER> <TABLE> { startPos = getPos(); }
+ ifExists = IfExistsOpt()
tableIdentifier = CompoundIdentifier()
(
LOOKAHEAD(2)
@@ -610,7 +612,8 @@ SqlAlterTable SqlAlterTable() :
return new SqlAlterTableRename(
startPos.plus(getPos()),
tableIdentifier,
- newTableIdentifier);
+ newTableIdentifier,
+ ifExists);
}
|
<RENAME>
@@ -622,7 +625,8 @@ SqlAlterTable SqlAlterTable() :
startPos.plus(getPos()),
tableIdentifier,
originColumnIdentifier,
- newColumnIdentifier);
+ newColumnIdentifier,
+ ifExists);
}
|
<RESET>
@@ -631,7 +635,8 @@ SqlAlterTable SqlAlterTable() :
return new SqlAlterTableReset(
startPos.plus(getPos()),
tableIdentifier,
- propertyKeyList);
+ propertyKeyList,
+ ifExists);
}
|
<SET>
@@ -640,7 +645,8 @@ SqlAlterTable SqlAlterTable() :
return new SqlAlterTableOptions(
startPos.plus(getPos()),
tableIdentifier,
- propertyList);
+ propertyList,
+ ifExists);
}
|
<ADD>
@@ -660,7 +666,8 @@ SqlAlterTable SqlAlterTable() :
tableIdentifier,
new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
ctx.constraints,
- ctx.watermark);
+ ctx.watermark,
+ ifExists);
}
|
<MODIFY>
@@ -680,7 +687,8 @@ SqlAlterTable SqlAlterTable() :
tableIdentifier,
new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
ctx.constraints,
- ctx.watermark);
+ ctx.watermark,
+ ifExists);
}
|
@@ -693,7 +701,8 @@ SqlAlterTable SqlAlterTable() :
tableIdentifier,
new SqlNodeList(
Collections.singletonList(columnName),
- getPos()));
+ getPos()),
+ ifExists);
}
|
{ Pair<SqlNodeList, SqlNodeList> columnWithTypePair = null; }
@@ -701,26 +710,30 @@ SqlAlterTable SqlAlterTable() :
return new SqlAlterTableDropColumn(
startPos.plus(getPos()),
tableIdentifier,
- columnWithTypePair.getKey());
+ columnWithTypePair.getKey(),
+ ifExists);
}
|
<PRIMARY> <KEY> {
return new SqlAlterTableDropPrimaryKey(
startPos.plus(getPos()),
- tableIdentifier);
+ tableIdentifier,
+ ifExists);
}
|
<CONSTRAINT> constraintName = SimpleIdentifier() {
return new SqlAlterTableDropConstraint(
startPos.plus(getPos()),
tableIdentifier,
- constraintName);
+ constraintName,
+ ifExists);
}
|
<WATERMARK> {
return new SqlAlterTableDropWatermark(
startPos.plus(getPos()),
- tableIdentifier);
+ tableIdentifier,
+ ifExists);
}
)
|
@@ -732,7 +745,7 @@ SqlAlterTable SqlAlterTable() :
]
<COMPACT>
{
- return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec);
+ return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec, ifExists);
}
)
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
index 3446b30d3c7..b66e3e27d35 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
@@ -35,24 +35,24 @@ import java.util.List;
/** ALTER TABLE DDL to add partitions to a table. */
public class SqlAddPartitions extends SqlAlterTable {
- private final boolean ifNotExists;
+ private final boolean ifPartitionNotExists;
private final List<SqlNodeList> partSpecs;
private final List<SqlNodeList> partProps;
public SqlAddPartitions(
SqlParserPos pos,
SqlIdentifier tableName,
- boolean ifNotExists,
+ boolean ifPartitionNotExists,
List<SqlNodeList> partSpecs,
List<SqlNodeList> partProps) {
- super(pos, tableName);
- this.ifNotExists = ifNotExists;
+ super(pos, tableName, false);
+ this.ifPartitionNotExists = ifPartitionNotExists;
this.partSpecs = partSpecs;
this.partProps = partProps;
}
- public boolean ifNotExists() {
- return ifNotExists;
+ public boolean ifPartitionNotExists() {
+ return ifPartitionNotExists;
}
public List<SqlNodeList> getPartSpecs() {
@@ -82,7 +82,7 @@ public class SqlAddPartitions extends SqlAlterTable {
super.unparse(writer, leftPrec, rightPrec);
writer.newlineAndIndent();
writer.keyword("ADD");
- if (ifNotExists) {
+ if (ifPartitionNotExists) {
writer.keyword("IF NOT EXISTS");
}
int opLeftPrec = getOperator().getLeftPrec();
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
index 000c700f0cf..417cd7b46b8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
@@ -48,7 +48,7 @@ public class SqlAddReplaceColumns extends SqlAlterTable {
SqlNodeList newColumns,
boolean replace,
@Nullable SqlNodeList properties) {
- super(pos, tableName);
+ super(pos, tableName, false);
this.newColumns = newColumns;
this.replace = replace;
this.properties = properties;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
index 24ce63823cb..1bd87398cf8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
@@ -39,8 +39,8 @@ import java.util.List;
import static java.util.Objects.requireNonNull;
/**
- * Abstract class to describe statements like ALTER TABLE [[catalogName.] dataBasesName].tableName
- * ...
+ * Abstract class to describe statements like ALTER TABLE [IF EXISTS] [[catalogName.]
+ * dataBasesName].tableName ...
*/
public abstract class SqlAlterTable extends SqlCall {
@@ -49,16 +49,26 @@ public abstract class SqlAlterTable extends SqlCall {
protected final SqlIdentifier tableIdentifier;
protected final SqlNodeList partitionSpec;
+ protected final boolean ifTableExists;
public SqlAlterTable(
- SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ @Nullable SqlNodeList partitionSpec,
+ boolean ifTableExists) {
super(pos);
this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
this.partitionSpec = partitionSpec;
+ this.ifTableExists = ifTableExists;
+ }
+
+ public SqlAlterTable(SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+ this(pos, tableName, null, ifTableExists);
}
- public SqlAlterTable(SqlParserPos pos, SqlIdentifier tableName) {
- this(pos, tableName, null);
+ public SqlAlterTable(
+ SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+ this(pos, tableName, partitionSpec, false);
}
@Override
@@ -73,6 +83,9 @@ public abstract class SqlAlterTable extends SqlCall {
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("ALTER TABLE");
+ if (ifTableExists) {
+ writer.keyword("IF EXISTS");
+ }
tableIdentifier.unparse(writer, leftPrec, rightPrec);
SqlNodeList partitionSpec = getPartitionSpec();
if (partitionSpec != null && partitionSpec.size() > 0) {
@@ -98,6 +111,15 @@ public abstract class SqlAlterTable extends SqlCall {
return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
}
+ /**
+ * Whether to ignore the error if the table doesn't exist.
+ *
+ * @return true when IF EXISTS is specified.
+ */
+ public boolean ifTableExists() {
+ return ifTableExists;
+ }
+
/** Alter table context. */
public static class AlterTableContext extends SqlCreateTable.TableCreationContext {
public List<SqlNode> columnPositions = new ArrayList<>();
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
index 2bb33480217..25016cd9640 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
import java.util.List;
/**
- * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name ADD column/constraint/watermark clause.
*
* <p>Example: DDL like the below for add column/constraint/watermark.
*
@@ -56,8 +56,9 @@ public class SqlAlterTableAdd extends SqlAlterTableSchema {
SqlIdentifier tableName,
SqlNodeList addedColumns,
List<SqlTableConstraint> constraint,
- @Nullable SqlWatermark sqlWatermark) {
- super(pos, tableName, addedColumns, constraint, sqlWatermark);
+ @Nullable SqlWatermark sqlWatermark,
+ boolean ifTableExists) {
+ super(pos, tableName, addedColumns, constraint, sqlWatermark, ifTableExists);
}
@Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
index 334f156432d..4359c0fe569 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
@@ -29,8 +29,8 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
/**
- * ALTER TABLE [catalog_name.][db_name.]table_name ADD [CONSTRAINT constraint_name] (PRIMARY KEY |
- * UNIQUE) (column, ...) [[NOT] ENFORCED].
+ * ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name ADD [CONSTRAINT constraint_name]
+ * (PRIMARY KEY | UNIQUE) (column, ...) [[NOT] ENFORCED].
*/
public class SqlAlterTableAddConstraint extends SqlAlterTable {
private final SqlTableConstraint constraint;
@@ -41,10 +41,14 @@ public class SqlAlterTableAddConstraint extends SqlAlterTable {
* @param tableID Table ID
* @param constraint Table constraint
* @param pos Parser position
+ * @param ifTableExists Whether IF EXISTS is specified
*/
public SqlAlterTableAddConstraint(
- SqlIdentifier tableID, SqlTableConstraint constraint, SqlParserPos pos) {
- super(pos, tableID);
+ SqlIdentifier tableID,
+ SqlTableConstraint constraint,
+ SqlParserPos pos,
+ boolean ifTableExists) {
+ super(pos, tableID, ifTableExists);
this.constraint = constraint;
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
index 247f4f96757..efec30d590d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
@@ -29,16 +29,22 @@ import javax.annotation.Nullable;
import java.util.List;
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */
+/**
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName [PARTITION partition_spec]
+ * COMPACT.
+ */
public class SqlAlterTableCompact extends SqlAlterTable {
public SqlAlterTableCompact(
- SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
- super(pos, tableName, partitionSpec);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ @Nullable SqlNodeList partitionSpec,
+ boolean ifTableExists) {
+ super(pos, tableName, partitionSpec, ifTableExists);
}
- public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) {
- super(pos, tableName);
+ public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
}
@Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
index 534a7a7a593..0028a2995fc 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
@@ -31,7 +31,7 @@ import java.util.Collections;
import java.util.List;
/**
- * SqlNode to describe ALTER TABLE table_name DROP column clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name DROP column clause.
*
* <p>Example: DDL like the below for drop column.
*
@@ -48,8 +48,11 @@ public class SqlAlterTableDropColumn extends SqlAlterTable {
private final SqlNodeList columnList;
public SqlAlterTableDropColumn(
- SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList) {
- super(pos, tableName);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList columnList,
+ boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
this.columnList = columnList;
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
index 59b6b85c13e..bb097fe5c83 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
@@ -26,7 +26,7 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */
+/** ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */
public class SqlAlterTableDropConstraint extends SqlAlterTable {
private final SqlIdentifier constraintName;
@@ -37,10 +37,14 @@ public class SqlAlterTableDropConstraint extends SqlAlterTable {
* @param pos Parser position
* @param tableName Table name
* @param constraintName Constraint name
+ * @param ifTableExists Whether IF EXISTS is specified
*/
public SqlAlterTableDropConstraint(
- SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier constraintName) {
- super(pos, tableName);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlIdentifier constraintName,
+ boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
this.constraintName = constraintName;
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
index fa7d48efd0d..506f4eaac26 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
@@ -26,11 +26,12 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.Collections;
import java.util.List;
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */
+/** ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */
public class SqlAlterTableDropPrimaryKey extends SqlAlterTable {
- public SqlAlterTableDropPrimaryKey(SqlParserPos pos, SqlIdentifier tableName) {
- super(pos, tableName);
+ public SqlAlterTableDropPrimaryKey(
+ SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
}
@Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
index 593a77634cd..0ed327abda3 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
@@ -27,7 +27,7 @@ import java.util.Collections;
import java.util.List;
/**
- * SqlNode to describe ALTER TABLE table_name DROP watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name DROP watermark clause.
*
* <p>Example: DDL like the below for drop watermark.
*
@@ -38,8 +38,9 @@ import java.util.List;
*/
public class SqlAlterTableDropWatermark extends SqlAlterTable {
- public SqlAlterTableDropWatermark(SqlParserPos pos, SqlIdentifier tableName) {
- super(pos, tableName);
+ public SqlAlterTableDropWatermark(
+ SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
}
@Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
index e1281e9207f..5ce7349321d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
import java.util.List;
/**
- * SqlNode to describe ALTER TABLE table_name MODIFY column/constraint/watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name MODIFY column/constraint/watermark clause.
*
* <p>Example: DDL like the below for modify column/constraint/watermark.
*
@@ -56,8 +56,9 @@ public class SqlAlterTableModify extends SqlAlterTableSchema {
SqlIdentifier tableName,
SqlNodeList modifiedColumns,
List<SqlTableConstraint> constraints,
- @Nullable SqlWatermark watermark) {
- super(pos, tableName, modifiedColumns, constraints, watermark);
+ @Nullable SqlWatermark watermark,
+ boolean ifTableExists) {
+ super(pos, tableName, modifiedColumns, constraints, watermark, ifTableExists);
}
@Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
index 5c5616410e8..006fede96a7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
@@ -31,14 +31,20 @@ import java.util.List;
import static java.util.Objects.requireNonNull;
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*). */
+/**
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName SET ( name=value [,
+ * name=value]*).
+ */
public class SqlAlterTableOptions extends SqlAlterTable {
private final SqlNodeList propertyList;
public SqlAlterTableOptions(
- SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
- this(pos, tableName, null, propertyList);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList propertyList,
+ boolean ifTableExists) {
+ this(pos, tableName, null, propertyList, ifTableExists);
}
public SqlAlterTableOptions(
@@ -46,7 +52,16 @@ public class SqlAlterTableOptions extends SqlAlterTable {
SqlIdentifier tableName,
SqlNodeList partitionSpec,
SqlNodeList propertyList) {
- super(pos, tableName, partitionSpec);
+ this(pos, tableName, partitionSpec, propertyList, false);
+ }
+
+ public SqlAlterTableOptions(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList partitionSpec,
+ SqlNodeList propertyList,
+ boolean ifTableExists) {
+ super(pos, tableName, partitionSpec, ifTableExists);
this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
}
@@ -71,8 +86,4 @@ public class SqlAlterTableOptions extends SqlAlterTable {
writer.newlineAndIndent();
writer.endList(withFrame);
}
-
- public String[] fullTableName() {
- return tableIdentifier.names.toArray(new String[0]);
- }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
index 6775353c7a7..50876a424e5 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
@@ -29,7 +29,7 @@ import java.util.List;
import static java.util.Objects.requireNonNull;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME TO [[catalogName.]
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RENAME TO [[catalogName.]
* dataBasesName].newTableName.
*/
public class SqlAlterTableRename extends SqlAlterTable {
@@ -37,11 +37,19 @@ public class SqlAlterTableRename extends SqlAlterTable {
private final SqlIdentifier newTableIdentifier;
public SqlAlterTableRename(
- SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier newTableName) {
- super(pos, tableName);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlIdentifier newTableName,
+ boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
this.newTableIdentifier = requireNonNull(newTableName, "new tableName should not be null");
}
+ public SqlAlterTableRename(
+ SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier newTableName) {
+ this(pos, tableName, newTableName, false);
+ }
+
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableIdentifier, newTableIdentifier);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index 72aa3c1a067..f49b5e7a365 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -27,7 +27,8 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
/**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME originColumnName TO newColumnName.
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RENAME originColumnName TO
+ * newColumnName.
*/
public class SqlAlterTableRenameColumn extends SqlAlterTable {
@@ -38,8 +39,9 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
SqlParserPos pos,
SqlIdentifier tableName,
SqlIdentifier originColumnIdentifier,
- SqlIdentifier newColumnIdentifier) {
- super(pos, tableName, null);
+ SqlIdentifier newColumnIdentifier,
+ boolean ifTableExists) {
+ super(pos, tableName, null, ifTableExists);
this.originColumnIdentifier = originColumnIdentifier;
this.newColumnIdentifier = newColumnIdentifier;
}
@@ -60,8 +62,7 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("ALTER TABLE");
- tableIdentifier.unparse(writer, leftPrec, rightPrec);
+ super.unparse(writer, leftPrec, rightPrec);
writer.keyword("RENAME");
originColumnIdentifier.unparse(writer, leftPrec, rightPrec);
writer.keyword("TO");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
index f947e91de50..bef1d473e5b 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
@@ -35,13 +35,16 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName RESET ( 'key1' [, 'key2']*). */
+/** ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RESET ( 'key1' [, 'key2']*). */
public class SqlAlterTableReset extends SqlAlterTable {
private final SqlNodeList propertyKeyList;
public SqlAlterTableReset(
- SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyKeyList) {
- super(pos, tableName, null);
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList propertyKeyList,
+ boolean ifTableExists) {
+ super(pos, tableName, null, ifTableExists);
this.propertyKeyList =
requireNonNull(propertyKeyList, "propertyKeyList should not be null");
}
@@ -73,8 +76,4 @@ public class SqlAlterTableReset extends SqlAlterTable {
writer.newlineAndIndent();
writer.endList(withFrame);
}
-
- public String[] fullTableName() {
- return tableIdentifier.names.toArray(new String[0]);
- }
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
index 48f9d810725..90e76d5dbc7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
@@ -49,8 +49,9 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten
SqlIdentifier tableName,
SqlNodeList columnList,
List<SqlTableConstraint> constraints,
- @Nullable SqlWatermark sqlWatermark) {
- super(pos, tableName);
+ @Nullable SqlWatermark sqlWatermark,
+ boolean ifTableExists) {
+ super(pos, tableName, ifTableExists);
this.columnList = columnList;
this.constraints = constraints;
this.watermark = sqlWatermark;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
index de4fa20421b..d706b9af69d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
@@ -54,7 +54,7 @@ public class SqlChangeColumn extends SqlAlterTable {
@Nullable SqlIdentifier after,
boolean first,
@Nullable SqlNodeList properties) {
- super(pos, tableName);
+ super(pos, tableName, false);
if (after != null && first) {
throw new IllegalArgumentException("FIRST and AFTER cannot be set at the same time");
}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
index 99d2e6a00d2..67dbb99505d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
@@ -43,7 +43,7 @@ public class SqlDropPartitions extends SqlAlterTable {
SqlIdentifier tableName,
boolean ifExists,
List<SqlNodeList> partSpecs) {
- super(pos, tableName);
+ super(pos, tableName, false);
this.ifExists = ifExists;
this.partSpecs = partSpecs;
}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7b1878a6794..7ba56b74747 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -314,19 +314,37 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
void testAlterTable() {
sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
+ sql("alter table if exists t1 rename to t2")
+ .ok("ALTER TABLE IF EXISTS `T1` RENAME TO `T2`");
sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
+ sql("alter table if exists c1.d1.t1 rename to t2")
+ .ok("ALTER TABLE IF EXISTS `C1`.`D1`.`T1` RENAME TO `T2`");
sql("alter table t1 set ('key1'='value1')")
.ok("ALTER TABLE `T1` SET (\n" + " 'key1' = 'value1'\n" + ")");
+ sql("alter table if exists t1 set ('key1'='value1')")
+ .ok("ALTER TABLE IF EXISTS `T1` SET (\n" + " 'key1' = 'value1'\n" + ")");
sql("alter table t1 add constraint ct1 primary key(a, b) not enforced")
.ok(
"ALTER TABLE `T1` ADD (\n"
+ " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+ ")");
+ sql("alter table if exists t1 add constraint ct1 primary key(a, b) not enforced")
+ .ok(
+ "ALTER TABLE IF EXISTS `T1` ADD (\n"
+ + " CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+ + ")");
sql("alter table t1 " + "add unique(a, b)")
.ok("ALTER TABLE `T1` ADD (\n" + " UNIQUE (`A`, `B`)\n" + ")");
+ sql("alter table if exists t1 " + "add unique(a, b)")
+ .ok("ALTER TABLE IF EXISTS `T1` ADD (\n" + " UNIQUE (`A`, `B`)\n" + ")");
sql("alter table t1 drop constraint ct1").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT1`");
+ sql("alter table if exists t1 drop constraint ct1")
+ .ok("ALTER TABLE IF EXISTS `T1` DROP CONSTRAINT `CT1`");
sql("alter table t1 rename a to b").ok("ALTER TABLE `T1` RENAME `A` TO `B`");
- sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` TO `A`.`Y`");
+ sql("alter table if exists t1 rename a to b")
+ .ok("ALTER TABLE IF EXISTS `T1` RENAME `A` TO `B`");
+ sql("alter table if exists t1 rename a.x to a.y")
+ .ok("ALTER TABLE IF EXISTS `T1` RENAME `A`.`X` TO `A`.`Y`");
}
@Test
@@ -359,6 +377,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
void testAlterTableAddSingleColumn() {
+ sql("alter table if exists t1 add new_column int not null")
+ .ok(
+ "ALTER TABLE IF EXISTS `T1` ADD (\n"
+ + " `NEW_COLUMN` INTEGER NOT NULL\n"
+ + ")");
sql("alter table t1 add new_column string comment 'new_column docs'")
.ok(
"ALTER TABLE `T1` ADD (\n"
@@ -390,6 +413,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
void testAlterTableAddWatermark() {
+ sql("alter table if exists t1 add watermark for ts as ts")
+ .ok("ALTER TABLE IF EXISTS `T1` ADD (\n" + " WATERMARK FOR `TS` AS `TS`\n" + ")");
sql("alter table t1 add watermark for ts as ts - interval '1' second")
.ok(
"ALTER TABLE `T1` ADD (\n"
@@ -452,6 +477,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTableModifySingleColumn() {
+ sql("alter table if exists t1 modify new_column string comment 'new_column docs'")
+ .ok(
+ "ALTER TABLE IF EXISTS `T1` MODIFY (\n"
+ + " `NEW_COLUMN` STRING COMMENT 'new_column docs'\n"
+ + ")");
sql("alter table t1 modify new_column string comment 'new_column docs'")
.ok(
"ALTER TABLE `T1` MODIFY (\n"
@@ -504,6 +534,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
void testAlterTableModifyWatermark() {
+ sql("alter table if exists t1 modify watermark for ts as ts")
+ .ok(
+ "ALTER TABLE IF EXISTS `T1` MODIFY (\n"
+ + " WATERMARK FOR `TS` AS `TS`\n"
+ + ")");
sql("alter table t1 modify watermark for ts as ts - interval '1' second")
.ok(
"ALTER TABLE `T1` MODIFY (\n"
@@ -565,6 +600,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTableDropSingleColumn() {
+ sql("alter table if exists t1 drop id")
+ .ok("ALTER TABLE IF EXISTS `T1` DROP (\n" + " `ID`\n" + ")");
sql("alter table t1 drop id").ok("ALTER TABLE `T1` DROP (\n" + " `ID`\n" + ")");
sql("alter table t1 drop (id)").ok("ALTER TABLE `T1` DROP (\n" + " `ID`\n" + ")");
@@ -575,6 +612,14 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTableDropMultipleColumn() {
+ sql("alter table if exists t1 drop (id, ts, tuple.f0, tuple.f1)")
+ .ok(
+ "ALTER TABLE IF EXISTS `T1` DROP (\n"
+ + " `ID`,\n"
+ + " `TS`,\n"
+ + " `TUPLE`.`F0`,\n"
+ + " `TUPLE`.`F1`\n"
+ + ")");
sql("alter table t1 drop (id, ts, tuple.f0, tuple.f1)")
.ok(
"ALTER TABLE `T1` DROP (\n"
@@ -587,11 +632,15 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTableDropPrimaryKey() {
+ sql("alter table if exists t1 drop primary key")
+ .ok("ALTER TABLE IF EXISTS `T1` DROP PRIMARY KEY");
sql("alter table t1 drop primary key").ok("ALTER TABLE `T1` DROP PRIMARY KEY");
}
@Test
public void testAlterTableDropConstraint() {
+ sql("alter table if exists t1 drop constraint ct")
+ .ok("ALTER TABLE IF EXISTS `T1` DROP CONSTRAINT `CT`");
sql("alter table t1 drop constraint ct").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT`");
sql("alter table t1 drop constrain^t^")
@@ -600,11 +649,16 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
public void testAlterTableDropWatermark() {
+ sql("alter table if exists t1 drop watermark")
+ .ok("ALTER TABLE IF EXISTS `T1` DROP WATERMARK");
sql("alter table t1 drop watermark").ok("ALTER TABLE `T1` DROP WATERMARK");
}
@Test
void testAlterTableReset() {
+ sql("alter table if exists t1 reset ('key1')")
+ .ok("ALTER TABLE IF EXISTS `T1` RESET (\n 'key1'\n)");
+
sql("alter table t1 reset ('key1')").ok("ALTER TABLE `T1` RESET (\n 'key1'\n)");
sql("alter table t1 reset ('key1', 'key2')")
@@ -615,6 +669,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
@Test
void testAlterTableCompact() {
+ sql("alter table if exists t1 compact").ok("ALTER TABLE IF EXISTS `T1` COMPACT");
+
sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT");
sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT");
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4a1d000cc5e..f33c2c31f2b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -981,14 +981,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
catalog.renameTable(
alterTableRenameOp.getTableIdentifier().toObjectPath(),
alterTableRenameOp.getNewTableIdentifier().getObjectName(),
- false);
+ alterTableRenameOp.ignoreIfTableNotExists());
} else if (alterTableOperation instanceof AlterTableOptionsOperation) {
AlterTableOptionsOperation alterTablePropertiesOp =
(AlterTableOptionsOperation) operation;
catalogManager.alterTable(
alterTablePropertiesOp.getCatalogTable(),
alterTablePropertiesOp.getTableIdentifier(),
- false);
+ alterTablePropertiesOp.ignoreIfTableNotExists());
} else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
AlterPartitionPropertiesOperation alterPartPropsOp =
(AlterPartitionPropertiesOperation) operation;
@@ -996,35 +996,39 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
alterPartPropsOp.getTableIdentifier().toObjectPath(),
alterPartPropsOp.getPartitionSpec(),
alterPartPropsOp.getCatalogPartition(),
- false);
+ alterPartPropsOp.ignoreIfTableNotExists());
} else if (alterTableOperation instanceof AlterTableSchemaOperation) {
AlterTableSchemaOperation alterTableSchemaOperation =
(AlterTableSchemaOperation) alterTableOperation;
catalogManager.alterTable(
alterTableSchemaOperation.getCatalogTable(),
alterTableSchemaOperation.getTableIdentifier(),
- false);
+ alterTableSchemaOperation.ignoreIfTableNotExists());
} else if (alterTableOperation instanceof AddPartitionsOperation) {
AddPartitionsOperation addPartitionsOperation =
(AddPartitionsOperation) alterTableOperation;
List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
List<CatalogPartition> partitions =
addPartitionsOperation.getCatalogPartitions();
- boolean ifNotExists = addPartitionsOperation.ifNotExists();
ObjectPath tablePath =
addPartitionsOperation.getTableIdentifier().toObjectPath();
for (int i = 0; i < specs.size(); i++) {
catalog.createPartition(
- tablePath, specs.get(i), partitions.get(i), ifNotExists);
+ tablePath,
+ specs.get(i),
+ partitions.get(i),
+ addPartitionsOperation.ignoreIfPartitionExists());
}
} else if (alterTableOperation instanceof DropPartitionsOperation) {
DropPartitionsOperation dropPartitionsOperation =
(DropPartitionsOperation) alterTableOperation;
ObjectPath tablePath =
dropPartitionsOperation.getTableIdentifier().toObjectPath();
- boolean ifExists = dropPartitionsOperation.ifExists();
for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
- catalog.dropPartition(tablePath, spec, ifExists);
+ catalog.dropPartition(
+ tablePath,
+ spec,
+ dropPartitionsOperation.ignoreIfPartitionNotExists());
}
} else if (alterTableOperation instanceof AlterTableChangeOperation) {
AlterTableChangeOperation alterTableChangeOperation =
@@ -1033,7 +1037,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
alterTableChangeOperation.getNewTable(),
alterTableChangeOperation.getTableChanges(),
alterTableChangeOperation.getTableIdentifier(),
- false);
+ alterTableChangeOperation.ignoreIfTableNotExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
index 2bc8f91cb0e..7a94c4f1cb1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
@@ -29,17 +29,17 @@ import java.util.Map;
/** Operation to describe ALTER TABLE ADD PARTITION statement. */
public class AddPartitionsOperation extends AlterTableOperation {
- private final boolean ifNotExists;
+ private final boolean ignoreIfPartitionExists;
private final List<CatalogPartitionSpec> partitionSpecs;
private final List<CatalogPartition> catalogPartitions;
public AddPartitionsOperation(
ObjectIdentifier tableIdentifier,
- boolean ifNotExists,
+ boolean ignoreIfPartitionExists,
List<CatalogPartitionSpec> partitionSpecs,
List<CatalogPartition> catalogPartitions) {
- super(tableIdentifier);
- this.ifNotExists = ifNotExists;
+ super(tableIdentifier, false);
+ this.ignoreIfPartitionExists = ignoreIfPartitionExists;
this.partitionSpecs = partitionSpecs;
this.catalogPartitions = catalogPartitions;
}
@@ -52,8 +52,8 @@ public class AddPartitionsOperation extends AlterTableOperation {
return catalogPartitions;
}
- public boolean ifNotExists() {
- return ifNotExists;
+ public boolean ignoreIfPartitionExists() {
+ return ignoreIfPartitionExists;
}
@Override
@@ -61,7 +61,7 @@ public class AddPartitionsOperation extends AlterTableOperation {
StringBuilder builder =
new StringBuilder(
String.format("ALTER TABLE %s ADD", tableIdentifier.asSummaryString()));
- if (ifNotExists) {
+ if (ignoreIfPartitionExists) {
builder.append(" IF NOT EXISTS");
}
for (int i = 0; i < partitionSpecs.size(); i++) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
index 50ef54b5ad4..de2bba9dcf6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
@@ -30,7 +30,7 @@ public abstract class AlterPartitionOperation extends AlterTableOperation {
public AlterPartitionOperation(
ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
- super(tableIdentifier);
+ super(tableIdentifier, false);
this.partitionSpec = partitionSpec;
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index ad083d1e574..600f130df8e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -36,8 +36,9 @@ public class AlterTableChangeOperation extends AlterTableOperation {
public AlterTableChangeOperation(
ObjectIdentifier tableIdentifier,
List<TableChange> tableChanges,
- CatalogTable newTable) {
- super(tableIdentifier);
+ CatalogTable newTable,
+ boolean ignoreIfNotExists) {
+ super(tableIdentifier, ignoreIfNotExists);
this.tableChanges = Collections.unmodifiableList(tableChanges);
this.newTable = newTable;
}
@@ -54,7 +55,11 @@ public class AlterTableChangeOperation extends AlterTableOperation {
public String asSummaryString() {
String changes =
tableChanges.stream().map(this::toString).collect(Collectors.joining(",\n"));
- return String.format("ALTER TABLE %s\n%s", tableIdentifier.asSummaryString(), changes);
+ return String.format(
+ "ALTER TABLE %s%s\n%s",
+ ignoreIfTableNotExists ? "IF EXISTS " : "",
+ tableIdentifier.asSummaryString(),
+ changes);
}
private String toString(TableChange tableChange) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
index ad28a28addf..9827dc7c633 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
@@ -24,13 +24,20 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
* Abstract Operation to describe all ALTER TABLE statements such as rename table /set properties.
*/
public abstract class AlterTableOperation implements AlterOperation {
+
protected final ObjectIdentifier tableIdentifier;
+ protected final boolean ignoreIfTableNotExists;
- public AlterTableOperation(ObjectIdentifier tableIdentifier) {
+ public AlterTableOperation(ObjectIdentifier tableIdentifier, boolean ignoreIfTableNotExists) {
this.tableIdentifier = tableIdentifier;
+ this.ignoreIfTableNotExists = ignoreIfTableNotExists;
}
public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}
+
+ public boolean ignoreIfTableNotExists() {
+ return ignoreIfTableNotExists;
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
index 4b505d1b1ab..ecb58a8b51d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
@@ -33,8 +33,11 @@ import java.util.stream.Collectors;
public class AlterTableOptionsOperation extends AlterTableOperation {
private final CatalogTable catalogTable;
- public AlterTableOptionsOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
- super(tableIdentifier);
+ public AlterTableOptionsOperation(
+ ObjectIdentifier tableIdentifier,
+ CatalogTable catalogTable,
+ boolean ignoreIfNotExists) {
+ super(tableIdentifier, ignoreIfNotExists);
this.catalogTable = catalogTable;
}
@@ -52,6 +55,9 @@ public class AlterTableOptionsOperation extends AlterTableOperation {
entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", "));
return String.format(
- "ALTER TABLE %s SET (%s)", tableIdentifier.asSummaryString(), description);
+ "ALTER %sTABLE %s SET (%s)",
+ ignoreIfTableNotExists ? "IF EXISTS " : "",
+ tableIdentifier.asSummaryString(),
+ description);
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
index f0d8b6a41be..e1084b84ea5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
@@ -25,8 +25,10 @@ public class AlterTableRenameOperation extends AlterTableOperation {
private final ObjectIdentifier newTableIdentifier;
public AlterTableRenameOperation(
- ObjectIdentifier tableIdentifier, ObjectIdentifier newTableIdentifier) {
- super(tableIdentifier);
+ ObjectIdentifier tableIdentifier,
+ ObjectIdentifier newTableIdentifier,
+ boolean ignoreIfNotExists) {
+ super(tableIdentifier, ignoreIfNotExists);
this.newTableIdentifier = newTableIdentifier;
}
@@ -37,7 +39,9 @@ public class AlterTableRenameOperation extends AlterTableOperation {
@Override
public String asSummaryString() {
return String.format(
- "ALTER TABLE %s RENAME TO %s",
- tableIdentifier.asSummaryString(), newTableIdentifier.asSummaryString());
+ "ALTER TABLE %s%s RENAME TO %s",
+ ignoreIfTableNotExists ? "IF EXISTS " : "",
+ tableIdentifier.asSummaryString(),
+ newTableIdentifier.asSummaryString());
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
index 3ec668a955f..71a16336d61 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
@@ -27,8 +27,11 @@ public class AlterTableSchemaOperation extends AlterTableOperation {
// the CatalogTable with the updated schema
private final CatalogTable catalogTable;
- public AlterTableSchemaOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
- super(tableIdentifier);
+ public AlterTableSchemaOperation(
+ ObjectIdentifier tableIdentifier,
+ CatalogTable catalogTable,
+ boolean ignoreIfNotExists) {
+ super(tableIdentifier, ignoreIfNotExists);
this.catalogTable = catalogTable;
}
@@ -39,7 +42,9 @@ public class AlterTableSchemaOperation extends AlterTableOperation {
@Override
public String asSummaryString() {
return String.format(
- "ALTER TABLE %s SET SCHEMA %s",
- tableIdentifier.asSummaryString(), catalogTable.getUnresolvedSchema().toString());
+ "ALTER TABLE %s%s SET SCHEMA %s",
+ ignoreIfTableNotExists ? "IF EXISTS " : "",
+ tableIdentifier.asSummaryString(),
+ catalogTable.getUnresolvedSchema().toString());
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
index 371cf6c117a..373f5e4e607 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
@@ -27,20 +27,20 @@ import java.util.List;
/** Operation to describe ALTER TABLE DROP PARTITION statement. */
public class DropPartitionsOperation extends AlterTableOperation {
- private final boolean ifExists;
+ private final boolean ignoreIfPartitionNotExists;
private final List<CatalogPartitionSpec> partitionSpecs;
public DropPartitionsOperation(
ObjectIdentifier tableIdentifier,
- boolean ifExists,
+ boolean ignoreIfPartitionNotExists,
List<CatalogPartitionSpec> partitionSpecs) {
- super(tableIdentifier);
- this.ifExists = ifExists;
+ super(tableIdentifier, false);
+ this.ignoreIfPartitionNotExists = ignoreIfPartitionNotExists;
this.partitionSpecs = partitionSpecs;
}
- public boolean ifExists() {
- return ifExists;
+ public boolean ignoreIfPartitionNotExists() {
+ return ignoreIfPartitionNotExists;
}
public List<CatalogPartitionSpec> getPartitionSpecs() {
@@ -52,7 +52,7 @@ public class DropPartitionsOperation extends AlterTableOperation {
StringBuilder builder =
new StringBuilder(
String.format("ALTER TABLE %s DROP", tableIdentifier.asSummaryString()));
- if (ifExists) {
+ if (ignoreIfPartitionNotExists) {
builder.append(" IF EXISTS");
}
for (CatalogPartitionSpec spec : partitionSpecs) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 420391b6e82..ff15b6bdc2a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -431,8 +431,9 @@ public interface Catalog {
* @param tablePath path of the table.
* @param partitionSpec partition spec of the partition
* @param partition the partition to add.
- * @param ignoreIfExists flag to specify behavior if a table with the given name already exists:
- * if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens.
+ * @param ignoreIfExists flag to specify behavior if a partition with the given name already
+ * exists: if set to false, it throws a PartitionAlreadyExistsException, if set to true,
+ * nothing happens.
* @throws TableNotExistException thrown if the target table does not exist
* @throws TableNotPartitionedException thrown if the target table is not partitioned
* @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
@@ -453,7 +454,7 @@ public interface Catalog {
*
* @param tablePath path of the table.
* @param partitionSpec partition spec of the partition to drop
- * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+ * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to
* false, throw an exception, if set to true, nothing happens.
* @throws PartitionNotExistException thrown if the target partition does not exist
* @throws CatalogException in case of any runtime exception
@@ -468,7 +469,7 @@ public interface Catalog {
* @param tablePath path of the table
* @param partitionSpec partition spec of the partition
* @param newPartition new partition to replace the old one
- * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+ * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to
* false, throw an exception, if set to true, nothing happens.
* @throws PartitionNotExistException thrown if the target partition does not exist
* @throws CatalogException in case of any runtime exception
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 57f6fae4384..15c0718c2a8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -941,7 +941,8 @@ public class AlterSchemaConverter {
newSchema,
oldTable.getComment(),
oldTable.getPartitionKeys(),
- oldTable.getOptions()));
+ oldTable.getOptions()),
+ alterTable.ifTableExists());
}
private static String getColumnName(SqlIdentifier identifier) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 0603160cc6a..96d279517fd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -132,6 +132,7 @@ import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -484,6 +485,9 @@ public class SqlToOperationConverter {
Optional<ContextResolvedTable> optionalCatalogTable =
catalogManager.getTable(tableIdentifier);
if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+ if (sqlAlterTable.ifTableExists()) {
+ return new NopOperation();
+ }
throw new ValidationException(
String.format(
"Table %s doesn't exist or is a temporary table.", tableIdentifier));
@@ -499,7 +503,8 @@ public class SqlToOperationConverter {
((SqlAlterTableRename) sqlAlterTable).fullNewTableName());
ObjectIdentifier newTableIdentifier =
catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
- return new AlterTableRenameOperation(tableIdentifier, newTableIdentifier);
+ return new AlterTableRenameOperation(
+ tableIdentifier, newTableIdentifier, sqlAlterTable.ifTableExists());
} else if (sqlAlterTable instanceof SqlAlterTableOptions) {
return convertAlterTableOptions(
tableIdentifier,
@@ -547,7 +552,7 @@ public class SqlToOperationConverter {
partitions.add(new CatalogPartitionImpl(props, null));
}
return new AddPartitionsOperation(
- tableIdentifier, addPartitions.ifNotExists(), specs, partitions);
+ tableIdentifier, addPartitions.ifPartitionNotExists(), specs, partitions);
} else if (sqlAlterTable instanceof SqlDropPartitions) {
SqlDropPartitions dropPartitions = (SqlDropPartitions) sqlAlterTable;
List<CatalogPartitionSpec> specs = new ArrayList<>();
@@ -607,7 +612,8 @@ public class SqlToOperationConverter {
changeOptions.entrySet().stream()
.map(entry -> TableChange.set(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()),
- oldTable.copy(newOptions));
+ oldTable.copy(newOptions),
+ alterTableOptions.ifTableExists());
}
}
@@ -630,7 +636,8 @@ public class SqlToOperationConverter {
return new AlterTableChangeOperation(
tableIdentifier,
resetKeys.stream().map(TableChange::reset).collect(Collectors.toList()),
- oldTable.copy(newOptions));
+ oldTable.copy(newOptions),
+ alterTableReset.ifTableExists());
}
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 7e0079b1568..806ced0ed0e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -139,9 +139,11 @@ public class OperationConverterUtils {
// So we don't translate with TableChanges here. One workaround is
// to minimize the edit distance, i.e., minimize the modification times, but it
// still cannot provide a deterministic answer.
- return new AlterTableSchemaOperation(tableIdentifier, newTable);
+ return new AlterTableSchemaOperation(
+ tableIdentifier, newTable, addReplaceColumns.ifTableExists());
} else {
- return new AlterTableChangeOperation(tableIdentifier, tableChanges, newTable);
+ return new AlterTableChangeOperation(
+ tableIdentifier, tableChanges, newTable, addReplaceColumns.ifTableExists());
}
}
@@ -185,7 +187,8 @@ public class OperationConverterUtils {
newSchema,
catalogTable.getPartitionKeys(),
newProperties,
- catalogTable.getComment()));
+ catalogTable.getComment()),
+ false);
// TODO: handle watermark and constraints
}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 253c147effa..5642359e807 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.EndStatementSetOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
+import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
@@ -1225,8 +1226,8 @@ public class SqlToOperationConverterTest {
final ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
final ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of("cat1", "db1", "tb2");
// test rename table converter
- for (int i = 0; i < renameTableSqls.length; i++) {
- Operation operation = parse(renameTableSqls[i]);
+ for (String renameTableSql : renameTableSqls) {
+ Operation operation = parse(renameTableSql);
assertThat(operation).isInstanceOf(AlterTableRenameOperation.class);
final AlterTableRenameOperation alterTableRenameOperation =
(AlterTableRenameOperation) operation;
@@ -1235,8 +1236,13 @@ public class SqlToOperationConverterTest {
assertThat(alterTableRenameOperation.getNewTableIdentifier())
.isEqualTo(expectedNewIdentifier);
}
+ // test alter nonexistent table
+ checkAlterNonExistTable("alter table %s nonexistent rename to tb2");
+
// test alter table options
- Operation operation = parse("alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
+ checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 'K2' = 'V2')");
+ Operation operation =
+ parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("connector", "dummy");
expectedOptions.put("k", "v");
@@ -1248,16 +1254,17 @@ public class SqlToOperationConverterTest {
expectedIdentifier,
expectedOptions,
Arrays.asList(TableChange.set("k1", "v1"), TableChange.set("K2", "V2")),
- "ALTER TABLE cat1.db1.tb1\n SET 'k1' = 'v1',\n SET 'K2' = 'V2'");
+ "ALTER TABLE IF EXISTS cat1.db1.tb1\n SET 'k1' = 'v1',\n SET 'K2' = 'V2'");
// test alter table reset
- operation = parse("alter table cat1.db1.tb1 reset ('k')");
+ checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
+ operation = parse("alter table if exists cat1.db1.tb1 reset ('k')");
assertAlterTableOptions(
operation,
expectedIdentifier,
Collections.singletonMap("connector", "dummy"),
Collections.singletonList(TableChange.reset("k")),
- "ALTER TABLE cat1.db1.tb1\n RESET 'k'");
+ "ALTER TABLE IF EXISTS cat1.db1.tb1\n RESET 'k'");
assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ('connector')"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
@@ -1378,6 +1385,7 @@ public class SqlToOperationConverterTest {
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
+ checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
}
@Test
@@ -1417,6 +1425,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("The column `ts` is referenced by watermark expression.");
+ checkAlterNonExistTable("alter table %s nonexistent drop a");
}
@Test
@@ -1465,6 +1474,8 @@ public class SqlToOperationConverterTest {
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
+ checkAlterNonExistTable("alter table %s nonexistent drop primary key");
+ checkAlterNonExistTable("alter table %s nonexistent drop constraint ct");
}
@Test
@@ -1499,6 +1510,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 drop watermark"))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("The base table does not define any watermark strategy.");
+ checkAlterNonExistTable("alter table %s nonexistent drop watermark");
}
@Test
@@ -1536,9 +1548,7 @@ public class SqlToOperationConverterTest {
"Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned.");
// alter a non-existed table
- assertThatThrownBy(() -> parse("alter table tb2 compact"))
- .isInstanceOf(ValidationException.class)
- .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table.");
+ checkAlterNonExistTable("alter table %s nonexistent compact");
checkAlterTableCompact(parse("alter table tb1 compact"), Collections.emptyMap());
}
@@ -1627,6 +1637,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
+ checkAlterNonExistTable("alter table %s nonexistent add a bigint not null");
}
@Test
@@ -1639,10 +1650,11 @@ public class SqlToOperationConverterTest {
// add a single column
Operation operation =
- parse("alter table tb1 add h double not null comment 'h is double not null'");
+ parse(
+ "alter table if exists tb1 add h double not null comment 'h is double not null'");
assertThat(operation.asSummaryString())
.isEqualTo(
- "ALTER TABLE cat1.db1.tb1\n"
+ "ALTER TABLE IF EXISTS cat1.db1.tb1\n"
+ " ADD `h` DOUBLE NOT NULL COMMENT 'h is double not null' ");
assertAlterTableSchema(
operation,
@@ -1815,6 +1827,7 @@ public class SqlToOperationConverterTest {
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
+ checkAlterNonExistTable("alter table %s nonexistent add primary key(x) not enforced");
}
@Test
@@ -1923,6 +1936,7 @@ public class SqlToOperationConverterTest {
"The base table has already defined the watermark strategy "
+ "`ts` AS ts - interval '5' seconds. "
+ "You might want to drop it before adding a new one.");
+ checkAlterNonExistTable("alter table %s nonexistent add watermark for ts as ts");
}
@Test
@@ -2080,6 +2094,7 @@ public class SqlToOperationConverterTest {
assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
+ checkAlterNonExistTable("alter table %s nonexistent modify a int first");
}
@Test
@@ -2273,6 +2288,8 @@ public class SqlToOperationConverterTest {
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"Invalid primary key 'ct'. Column 'g' is not a physical column.");
+ checkAlterNonExistTable(
+ "alter table %s nonexistent modify constraint ct primary key(a) not enforced");
}
@Test
@@ -2354,6 +2371,7 @@ public class SqlToOperationConverterTest {
"Invalid data type of time field for watermark definition. "
+ "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
+ "but the time field type is STRING");
+ checkAlterNonExistTable("alter table %s nonexistent modify watermark for ts as ts");
}
@Test
@@ -2850,6 +2868,15 @@ public class SqlToOperationConverterTest {
return plannerContext.createCalciteParser();
}
+ private void checkAlterNonExistTable(String sqlTemplate) {
+ assertThat(parse(String.format(sqlTemplate, "if exists ")))
+ .isInstanceOf(NopOperation.class);
+ assertThatThrownBy(() -> parse(String.format(sqlTemplate, "")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Table `cat1`.`db1`.`nonexistent` doesn't exist or is a temporary table.");
+ }
+
private void checkAlterTableCompact(Operation operation, Map<String, String> staticPartitions) {
assertThat(operation).isInstanceOf(SinkModifyOperation.class);
SinkModifyOperation modifyOperation = (SinkModifyOperation) operation;