You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2023/01/15 09:56:49 UTC

[flink] branch master updated: [FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c26daffb904 [FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE
c26daffb904 is described below

commit c26daffb9041381f04a37f62280eab9731dad8f4
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Sun Jan 15 17:56:42 2023 +0800

    [FLINK-19883][table] Support "IF EXISTS" in DDL for ALTER TABLE
    
    This closes #21640
---
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  | 14 +++---
 .../src/test/resources/sql/table.q                 |  4 ++
 .../sql/parser/hive/ddl/SqlAddHivePartitions.java  |  2 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 37 +++++++++-----
 .../flink/sql/parser/ddl/SqlAddPartitions.java     | 14 +++---
 .../flink/sql/parser/ddl/SqlAddReplaceColumns.java |  2 +-
 .../apache/flink/sql/parser/ddl/SqlAlterTable.java | 32 ++++++++++--
 .../flink/sql/parser/ddl/SqlAlterTableAdd.java     |  7 +--
 .../sql/parser/ddl/SqlAlterTableAddConstraint.java | 12 +++--
 .../flink/sql/parser/ddl/SqlAlterTableCompact.java | 16 ++++--
 .../sql/parser/ddl/SqlAlterTableDropColumn.java    |  9 ++--
 .../parser/ddl/SqlAlterTableDropConstraint.java    | 10 ++--
 .../parser/ddl/SqlAlterTableDropPrimaryKey.java    |  7 +--
 .../sql/parser/ddl/SqlAlterTableDropWatermark.java |  7 +--
 .../flink/sql/parser/ddl/SqlAlterTableModify.java  |  7 +--
 .../flink/sql/parser/ddl/SqlAlterTableOptions.java | 27 +++++++---
 .../flink/sql/parser/ddl/SqlAlterTableRename.java  | 14 ++++--
 .../sql/parser/ddl/SqlAlterTableRenameColumn.java  | 11 ++--
 .../flink/sql/parser/ddl/SqlAlterTableReset.java   | 13 +++--
 .../flink/sql/parser/ddl/SqlAlterTableSchema.java  |  5 +-
 .../flink/sql/parser/ddl/SqlChangeColumn.java      |  2 +-
 .../flink/sql/parser/ddl/SqlDropPartitions.java    |  2 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 58 +++++++++++++++++++++-
 .../table/api/internal/TableEnvironmentImpl.java   | 22 ++++----
 .../operations/ddl/AddPartitionsOperation.java     | 14 +++---
 .../operations/ddl/AlterPartitionOperation.java    |  2 +-
 .../operations/ddl/AlterTableChangeOperation.java  | 11 ++--
 .../table/operations/ddl/AlterTableOperation.java  |  9 +++-
 .../operations/ddl/AlterTableOptionsOperation.java | 12 +++--
 .../operations/ddl/AlterTableRenameOperation.java  | 12 +++--
 .../operations/ddl/AlterTableSchemaOperation.java  | 13 +++--
 .../operations/ddl/DropPartitionsOperation.java    | 14 +++---
 .../org/apache/flink/table/catalog/Catalog.java    |  9 ++--
 .../planner/operations/AlterSchemaConverter.java   |  3 +-
 .../operations/SqlToOperationConverter.java        | 15 ++++--
 .../planner/utils/OperationConverterUtils.java     |  9 ++--
 .../operations/SqlToOperationConverterTest.java    | 49 ++++++++++++++----
 37 files changed, 356 insertions(+), 150 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index fc7913c4efe..3787a20c644 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -1501,7 +1501,8 @@ public class HiveParserDDLSemanticAnalyzer {
                     newProps.entrySet().stream()
                             .map(entry -> TableChange.set(entry.getKey(), entry.getValue()))
                             .collect(Collectors.toList()),
-                    oldTable.copy(props));
+                    oldTable.copy(props),
+                    false);
         }
     }
 
@@ -1879,7 +1880,7 @@ public class HiveParserDDLSemanticAnalyzer {
         return expectView
                 ? new AlterViewRenameOperation(objectIdentifier, parseObjectIdentifier(targetName))
                 : new AlterTableRenameOperation(
-                        objectIdentifier, parseObjectIdentifier(targetName));
+                        objectIdentifier, parseObjectIdentifier(targetName), false);
     }
 
     private Operation convertAlterTableChangeCol(
@@ -1979,7 +1980,8 @@ public class HiveParserDDLSemanticAnalyzer {
                 tableIdentifier,
                 tableChanges,
                 new CatalogTableImpl(
-                        newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()));
+                        newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment()),
+                false);
     }
 
     private Operation convertAlterTableModifyCols(
@@ -2033,10 +2035,8 @@ public class HiveParserDDLSemanticAnalyzer {
         return new AlterTableSchemaOperation(
                 tableIdentifier,
                 new CatalogTableImpl(
-                        builder.build(),
-                        oldTable.getPartitionKeys(),
-                        props,
-                        oldTable.getComment()));
+                        builder.build(), oldTable.getPartitionKeys(), props, oldTable.getComment()),
+                false);
     }
 
     private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) {
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 1d1764ac5e0..4bc5b909693 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -44,6 +44,10 @@ alter table non_exist rename to non_exist2;
 org.apache.flink.table.api.ValidationException: Table `default_catalog`.`default_database`.`non_exist` doesn't exist or is a temporary table.
 !error
 
+alter table if exists non_exist rename to non_exist2;
+[INFO] Execute statement succeed.
+!info
+
 # ==========================================================================
 # test create table
 # ==========================================================================
diff --git a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
index 8bf84e8c77a..e0ae741c504 100644
--- a/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
+++ b/flink-table/flink-sql-parser-hive/src/main/java/org/apache/flink/sql/parser/hive/ddl/SqlAddHivePartitions.java
@@ -75,7 +75,7 @@ public class SqlAddHivePartitions extends SqlAddPartitions {
         tableIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.newlineAndIndent();
         writer.keyword("ADD");
-        if (ifNotExists()) {
+        if (ifPartitionNotExists()) {
             writer.keyword("IF NOT EXISTS");
         }
         int opLeftPrec = getOperator().getLeftPrec();
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 13b61a6937b..1c62bd776f1 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -590,6 +590,7 @@ SqlAlterTable SqlAlterTable() :
     SqlParserPos startPos;
     SqlIdentifier tableIdentifier;
     SqlIdentifier newTableIdentifier = null;
+    boolean ifExists = false;
     SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
     SqlNodeList partitionSpec = null;
@@ -601,6 +602,7 @@ SqlAlterTable SqlAlterTable() :
 }
 {
     <ALTER> <TABLE> { startPos = getPos(); }
+        ifExists = IfExistsOpt()
         tableIdentifier = CompoundIdentifier()
     (
         LOOKAHEAD(2)
@@ -610,7 +612,8 @@ SqlAlterTable SqlAlterTable() :
             return new SqlAlterTableRename(
                         startPos.plus(getPos()),
                         tableIdentifier,
-                        newTableIdentifier);
+                        newTableIdentifier,
+                        ifExists);
         }
     |
         <RENAME>
@@ -622,7 +625,8 @@ SqlAlterTable SqlAlterTable() :
                     startPos.plus(getPos()),
                     tableIdentifier,
                     originColumnIdentifier,
-                    newColumnIdentifier);
+                    newColumnIdentifier,
+                    ifExists);
         }
     |
         <RESET>
@@ -631,7 +635,8 @@ SqlAlterTable SqlAlterTable() :
             return new SqlAlterTableReset(
                         startPos.plus(getPos()),
                         tableIdentifier,
-                        propertyKeyList);
+                        propertyKeyList,
+                        ifExists);
         }
     |
         <SET>
@@ -640,7 +645,8 @@ SqlAlterTable SqlAlterTable() :
             return new SqlAlterTableOptions(
                         startPos.plus(getPos()),
                         tableIdentifier,
-                        propertyList);
+                        propertyList,
+                        ifExists);
         }
     |
         <ADD>
@@ -660,7 +666,8 @@ SqlAlterTable SqlAlterTable() :
                         tableIdentifier,
                         new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
                         ctx.constraints,
-                        ctx.watermark);
+                        ctx.watermark,
+                        ifExists);
         }
     |
         <MODIFY>
@@ -680,7 +687,8 @@ SqlAlterTable SqlAlterTable() :
                         tableIdentifier,
                         new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
                         ctx.constraints,
-                        ctx.watermark);
+                        ctx.watermark,
+                        ifExists);
         }
 
     |
@@ -693,7 +701,8 @@ SqlAlterTable SqlAlterTable() :
                             tableIdentifier,
                             new SqlNodeList(
                                 Collections.singletonList(columnName),
-                                getPos()));
+                                getPos()),
+                            ifExists);
             }
         |
             { Pair<SqlNodeList, SqlNodeList> columnWithTypePair = null; }
@@ -701,26 +710,30 @@ SqlAlterTable SqlAlterTable() :
                 return new SqlAlterTableDropColumn(
                             startPos.plus(getPos()),
                             tableIdentifier,
-                            columnWithTypePair.getKey());
+                            columnWithTypePair.getKey(),
+                            ifExists);
             }
         |
             <PRIMARY> <KEY> {
                 return new SqlAlterTableDropPrimaryKey(
                         startPos.plus(getPos()),
-                        tableIdentifier);
+                        tableIdentifier,
+                        ifExists);
             }
         |
             <CONSTRAINT> constraintName = SimpleIdentifier() {
                 return new SqlAlterTableDropConstraint(
                             startPos.plus(getPos()),
                             tableIdentifier,
-                            constraintName);
+                            constraintName,
+                            ifExists);
             }
         |
             <WATERMARK> {
                 return new SqlAlterTableDropWatermark(
                             startPos.plus(getPos()),
-                            tableIdentifier);
+                            tableIdentifier,
+                            ifExists);
             }
         )
     |
@@ -732,7 +745,7 @@ SqlAlterTable SqlAlterTable() :
         ]
         <COMPACT>
         {
-            return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec);
+            return new SqlAlterTableCompact(startPos.plus(getPos()), tableIdentifier, partitionSpec, ifExists);
         }
     )
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
index 3446b30d3c7..b66e3e27d35 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddPartitions.java
@@ -35,24 +35,24 @@ import java.util.List;
 /** ALTER TABLE DDL to add partitions to a table. */
 public class SqlAddPartitions extends SqlAlterTable {
 
-    private final boolean ifNotExists;
+    private final boolean ifPartitionNotExists;
     private final List<SqlNodeList> partSpecs;
     private final List<SqlNodeList> partProps;
 
     public SqlAddPartitions(
             SqlParserPos pos,
             SqlIdentifier tableName,
-            boolean ifNotExists,
+            boolean ifPartitionNotExists,
             List<SqlNodeList> partSpecs,
             List<SqlNodeList> partProps) {
-        super(pos, tableName);
-        this.ifNotExists = ifNotExists;
+        super(pos, tableName, false);
+        this.ifPartitionNotExists = ifPartitionNotExists;
         this.partSpecs = partSpecs;
         this.partProps = partProps;
     }
 
-    public boolean ifNotExists() {
-        return ifNotExists;
+    public boolean ifPartitionNotExists() {
+        return ifPartitionNotExists;
     }
 
     public List<SqlNodeList> getPartSpecs() {
@@ -82,7 +82,7 @@ public class SqlAddPartitions extends SqlAlterTable {
         super.unparse(writer, leftPrec, rightPrec);
         writer.newlineAndIndent();
         writer.keyword("ADD");
-        if (ifNotExists) {
+        if (ifPartitionNotExists) {
             writer.keyword("IF NOT EXISTS");
         }
         int opLeftPrec = getOperator().getLeftPrec();
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
index 000c700f0cf..417cd7b46b8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAddReplaceColumns.java
@@ -48,7 +48,7 @@ public class SqlAddReplaceColumns extends SqlAlterTable {
             SqlNodeList newColumns,
             boolean replace,
             @Nullable SqlNodeList properties) {
-        super(pos, tableName);
+        super(pos, tableName, false);
         this.newColumns = newColumns;
         this.replace = replace;
         this.properties = properties;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
index 24ce63823cb..1bd87398cf8 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java
@@ -39,8 +39,8 @@ import java.util.List;
 import static java.util.Objects.requireNonNull;
 
 /**
- * Abstract class to describe statements like ALTER TABLE [[catalogName.] dataBasesName].tableName
- * ...
+ * Abstract class to describe statements like ALTER TABLE [IF EXISTS] [[catalogName.]
+ * dataBasesName].tableName ...
  */
 public abstract class SqlAlterTable extends SqlCall {
 
@@ -49,16 +49,26 @@ public abstract class SqlAlterTable extends SqlCall {
 
     protected final SqlIdentifier tableIdentifier;
     protected final SqlNodeList partitionSpec;
+    protected final boolean ifTableExists;
 
     public SqlAlterTable(
-            SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            @Nullable SqlNodeList partitionSpec,
+            boolean ifTableExists) {
         super(pos);
         this.tableIdentifier = requireNonNull(tableName, "tableName should not be null");
         this.partitionSpec = partitionSpec;
+        this.ifTableExists = ifTableExists;
+    }
+
+    public SqlAlterTable(SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+        this(pos, tableName, null, ifTableExists);
     }
 
-    public SqlAlterTable(SqlParserPos pos, SqlIdentifier tableName) {
-        this(pos, tableName, null);
+    public SqlAlterTable(
+            SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
+        this(pos, tableName, partitionSpec, false);
     }
 
     @Override
@@ -73,6 +83,9 @@ public abstract class SqlAlterTable extends SqlCall {
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("ALTER TABLE");
+        if (ifTableExists) {
+            writer.keyword("IF EXISTS");
+        }
         tableIdentifier.unparse(writer, leftPrec, rightPrec);
         SqlNodeList partitionSpec = getPartitionSpec();
         if (partitionSpec != null && partitionSpec.size() > 0) {
@@ -98,6 +111,15 @@ public abstract class SqlAlterTable extends SqlCall {
         return SqlPartitionUtils.getPartitionKVs(getPartitionSpec());
     }
 
+    /**
+     * Whether to ignore the error if the table doesn't exist.
+     *
+     * @return true when IF EXISTS is specified.
+     */
+    public boolean ifTableExists() {
+        return ifTableExists;
+    }
+
     /** Alter table context. */
     public static class AlterTableContext extends SqlCreateTable.TableCreationContext {
         public List<SqlNode> columnPositions = new ArrayList<>();
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
index 2bb33480217..25016cd9640 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 /**
- * SqlNode to describe ALTER TABLE table_name ADD column/constraint/watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name ADD column/constraint/watermark clause.
  *
  * <p>Example: DDL like the below for add column/constraint/watermark.
  *
@@ -56,8 +56,9 @@ public class SqlAlterTableAdd extends SqlAlterTableSchema {
             SqlIdentifier tableName,
             SqlNodeList addedColumns,
             List<SqlTableConstraint> constraint,
-            @Nullable SqlWatermark sqlWatermark) {
-        super(pos, tableName, addedColumns, constraint, sqlWatermark);
+            @Nullable SqlWatermark sqlWatermark,
+            boolean ifTableExists) {
+        super(pos, tableName, addedColumns, constraint, sqlWatermark, ifTableExists);
     }
 
     @Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
index 334f156432d..4359c0fe569 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAddConstraint.java
@@ -29,8 +29,8 @@ import org.apache.calcite.util.ImmutableNullableList;
 import java.util.List;
 
 /**
- * ALTER TABLE [catalog_name.][db_name.]table_name ADD [CONSTRAINT constraint_name] (PRIMARY KEY |
- * UNIQUE) (column, ...) [[NOT] ENFORCED].
+ * ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name ADD [CONSTRAINT constraint_name]
+ * (PRIMARY KEY | UNIQUE) (column, ...) [[NOT] ENFORCED].
  */
 public class SqlAlterTableAddConstraint extends SqlAlterTable {
     private final SqlTableConstraint constraint;
@@ -41,10 +41,14 @@ public class SqlAlterTableAddConstraint extends SqlAlterTable {
      * @param tableID Table ID
      * @param constraint Table constraint
      * @param pos Parser position
+     * @param ifTableExists Whether IF EXISTS is specified
      */
     public SqlAlterTableAddConstraint(
-            SqlIdentifier tableID, SqlTableConstraint constraint, SqlParserPos pos) {
-        super(pos, tableID);
+            SqlIdentifier tableID,
+            SqlTableConstraint constraint,
+            SqlParserPos pos,
+            boolean ifTableExists) {
+        super(pos, tableID, ifTableExists);
         this.constraint = constraint;
     }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
index 247f4f96757..efec30d590d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableCompact.java
@@ -29,16 +29,22 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName [PARTITION partition_spec] COMPACT. */
+/**
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName [PARTITION partition_spec]
+ * COMPACT.
+ */
 public class SqlAlterTableCompact extends SqlAlterTable {
 
     public SqlAlterTableCompact(
-            SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) {
-        super(pos, tableName, partitionSpec);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            @Nullable SqlNodeList partitionSpec,
+            boolean ifTableExists) {
+        super(pos, tableName, partitionSpec, ifTableExists);
     }
 
-    public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName) {
-        super(pos, tableName);
+    public SqlAlterTableCompact(SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
     }
 
     @Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
index 534a7a7a593..0028a2995fc 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
@@ -31,7 +31,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * SqlNode to describe ALTER TABLE table_name DROP column clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name DROP column clause.
  *
  * <p>Example: DDL like the below for drop column.
  *
@@ -48,8 +48,11 @@ public class SqlAlterTableDropColumn extends SqlAlterTable {
     private final SqlNodeList columnList;
 
     public SqlAlterTableDropColumn(
-            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList) {
-        super(pos, tableName);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
         this.columnList = columnList;
     }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
index 59b6b85c13e..bb097fe5c83 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
@@ -26,7 +26,7 @@ import org.apache.calcite.util.ImmutableNullableList;
 
 import java.util.List;
 
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */
+/** ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */
 public class SqlAlterTableDropConstraint extends SqlAlterTable {
 
     private final SqlIdentifier constraintName;
@@ -37,10 +37,14 @@ public class SqlAlterTableDropConstraint extends SqlAlterTable {
      * @param pos Parser position
      * @param tableName Table name
      * @param constraintName Constraint name
+     * @param ifTableExists Whether IF EXISTS is specified
      */
     public SqlAlterTableDropConstraint(
-            SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier constraintName) {
-        super(pos, tableName);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlIdentifier constraintName,
+            boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
         this.constraintName = constraintName;
     }
 
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
index fa7d48efd0d..506f4eaac26 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
@@ -26,11 +26,12 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import java.util.Collections;
 import java.util.List;
 
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */
+/** ALTER TABLE [IF EXISTS] [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */
 public class SqlAlterTableDropPrimaryKey extends SqlAlterTable {
 
-    public SqlAlterTableDropPrimaryKey(SqlParserPos pos, SqlIdentifier tableName) {
-        super(pos, tableName);
+    public SqlAlterTableDropPrimaryKey(
+            SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
     }
 
     @Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
index 593a77634cd..0ed327abda3 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
@@ -27,7 +27,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * SqlNode to describe ALTER TABLE table_name DROP watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name DROP watermark clause.
  *
  * <p>Example: DDL like the below for drop watermark.
  *
@@ -38,8 +38,9 @@ import java.util.List;
  */
 public class SqlAlterTableDropWatermark extends SqlAlterTable {
 
-    public SqlAlterTableDropWatermark(SqlParserPos pos, SqlIdentifier tableName) {
-        super(pos, tableName);
+    public SqlAlterTableDropWatermark(
+            SqlParserPos pos, SqlIdentifier tableName, boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
     }
 
     @Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
index e1281e9207f..5ce7349321d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java
@@ -31,7 +31,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 /**
- * SqlNode to describe ALTER TABLE table_name MODIFY column/constraint/watermark clause.
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name MODIFY column/constraint/watermark clause.
  *
  * <p>Example: DDL like the below for modify column/constraint/watermark.
  *
@@ -56,8 +56,9 @@ public class SqlAlterTableModify extends SqlAlterTableSchema {
             SqlIdentifier tableName,
             SqlNodeList modifiedColumns,
             List<SqlTableConstraint> constraints,
-            @Nullable SqlWatermark watermark) {
-        super(pos, tableName, modifiedColumns, constraints, watermark);
+            @Nullable SqlWatermark watermark,
+            boolean ifTableExists) {
+        super(pos, tableName, modifiedColumns, constraints, watermark, ifTableExists);
     }
 
     @Override
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
index 5c5616410e8..006fede96a7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableOptions.java
@@ -31,14 +31,20 @@ import java.util.List;
 
 import static java.util.Objects.requireNonNull;
 
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName SET ( name=value [, name=value]*). */
+/**
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName SET ( name=value [,
+ * name=value]*).
+ */
 public class SqlAlterTableOptions extends SqlAlterTable {
 
     private final SqlNodeList propertyList;
 
     public SqlAlterTableOptions(
-            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) {
-        this(pos, tableName, null, propertyList);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList propertyList,
+            boolean ifTableExists) {
+        this(pos, tableName, null, propertyList, ifTableExists);
     }
 
     public SqlAlterTableOptions(
@@ -46,7 +52,16 @@ public class SqlAlterTableOptions extends SqlAlterTable {
             SqlIdentifier tableName,
             SqlNodeList partitionSpec,
             SqlNodeList propertyList) {
-        super(pos, tableName, partitionSpec);
+        this(pos, tableName, partitionSpec, propertyList, false);
+    }
+
+    public SqlAlterTableOptions(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList partitionSpec,
+            SqlNodeList propertyList,
+            boolean ifTableExists) {
+        super(pos, tableName, partitionSpec, ifTableExists);
         this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
     }
 
@@ -71,8 +86,4 @@ public class SqlAlterTableOptions extends SqlAlterTable {
         writer.newlineAndIndent();
         writer.endList(withFrame);
     }
-
-    public String[] fullTableName() {
-        return tableIdentifier.names.toArray(new String[0]);
-    }
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
index 6775353c7a7..50876a424e5 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRename.java
@@ -29,7 +29,7 @@ import java.util.List;
 import static java.util.Objects.requireNonNull;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME TO [[catalogName.]
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RENAME TO [[catalogName.]
  * dataBasesName].newTableName.
  */
 public class SqlAlterTableRename extends SqlAlterTable {
@@ -37,11 +37,19 @@ public class SqlAlterTableRename extends SqlAlterTable {
     private final SqlIdentifier newTableIdentifier;
 
     public SqlAlterTableRename(
-            SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier newTableName) {
-        super(pos, tableName);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlIdentifier newTableName,
+            boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
         this.newTableIdentifier = requireNonNull(newTableName, "new tableName should not be null");
     }
 
+    public SqlAlterTableRename(
+            SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier newTableName) {
+        this(pos, tableName, newTableName, false);
+    }
+
     @Override
     public List<SqlNode> getOperandList() {
         return ImmutableNullableList.of(tableIdentifier, newTableIdentifier);
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
index 72aa3c1a067..f49b5e7a365 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableRenameColumn.java
@@ -27,7 +27,8 @@ import org.apache.calcite.util.ImmutableNullableList;
 import java.util.List;
 
 /**
- * ALTER TABLE [[catalogName.] dataBasesName].tableName RENAME originColumnName TO newColumnName.
+ * ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RENAME originColumnName TO
+ * newColumnName.
  */
 public class SqlAlterTableRenameColumn extends SqlAlterTable {
 
@@ -38,8 +39,9 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
             SqlParserPos pos,
             SqlIdentifier tableName,
             SqlIdentifier originColumnIdentifier,
-            SqlIdentifier newColumnIdentifier) {
-        super(pos, tableName, null);
+            SqlIdentifier newColumnIdentifier,
+            boolean ifTableExists) {
+        super(pos, tableName, null, ifTableExists);
         this.originColumnIdentifier = originColumnIdentifier;
         this.newColumnIdentifier = newColumnIdentifier;
     }
@@ -60,8 +62,7 @@ public class SqlAlterTableRenameColumn extends SqlAlterTable {
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("ALTER TABLE");
-        tableIdentifier.unparse(writer, leftPrec, rightPrec);
+        super.unparse(writer, leftPrec, rightPrec);
         writer.keyword("RENAME");
         originColumnIdentifier.unparse(writer, leftPrec, rightPrec);
         writer.keyword("TO");
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
index f947e91de50..bef1d473e5b 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
@@ -35,13 +35,16 @@ import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
 
-/** ALTER TABLE [[catalogName.] dataBasesName].tableName RESET ( 'key1' [, 'key2']*). */
+/** ALTER TABLE [IF EXISTS] [[catalogName.] dataBasesName].tableName RESET ( 'key1' [, 'key2']*). */
 public class SqlAlterTableReset extends SqlAlterTable {
     private final SqlNodeList propertyKeyList;
 
     public SqlAlterTableReset(
-            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyKeyList) {
-        super(pos, tableName, null);
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList propertyKeyList,
+            boolean ifTableExists) {
+        super(pos, tableName, null, ifTableExists);
         this.propertyKeyList =
                 requireNonNull(propertyKeyList, "propertyKeyList should not be null");
     }
@@ -73,8 +76,4 @@ public class SqlAlterTableReset extends SqlAlterTable {
         writer.newlineAndIndent();
         writer.endList(withFrame);
     }
-
-    public String[] fullTableName() {
-        return tableIdentifier.names.toArray(new String[0]);
-    }
 }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
index 48f9d810725..90e76d5dbc7 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java
@@ -49,8 +49,9 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten
             SqlIdentifier tableName,
             SqlNodeList columnList,
             List<SqlTableConstraint> constraints,
-            @Nullable SqlWatermark sqlWatermark) {
-        super(pos, tableName);
+            @Nullable SqlWatermark sqlWatermark,
+            boolean ifTableExists) {
+        super(pos, tableName, ifTableExists);
         this.columnList = columnList;
         this.constraints = constraints;
         this.watermark = sqlWatermark;
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
index de4fa20421b..d706b9af69d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlChangeColumn.java
@@ -54,7 +54,7 @@ public class SqlChangeColumn extends SqlAlterTable {
             @Nullable SqlIdentifier after,
             boolean first,
             @Nullable SqlNodeList properties) {
-        super(pos, tableName);
+        super(pos, tableName, false);
         if (after != null && first) {
             throw new IllegalArgumentException("FIRST and AFTER cannot be set at the same time");
         }
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
index 99d2e6a00d2..67dbb99505d 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropPartitions.java
@@ -43,7 +43,7 @@ public class SqlDropPartitions extends SqlAlterTable {
             SqlIdentifier tableName,
             boolean ifExists,
             List<SqlNodeList> partSpecs) {
-        super(pos, tableName);
+        super(pos, tableName, false);
         this.ifExists = ifExists;
         this.partSpecs = partSpecs;
     }
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 7b1878a6794..7ba56b74747 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -314,19 +314,37 @@ class FlinkSqlParserImplTest extends SqlParserTest {
     @Test
     void testAlterTable() {
         sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`");
+        sql("alter table if exists t1 rename to t2")
+                .ok("ALTER TABLE IF EXISTS `T1` RENAME TO `T2`");
         sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`");
+        sql("alter table if exists c1.d1.t1 rename to t2")
+                .ok("ALTER TABLE IF EXISTS `C1`.`D1`.`T1` RENAME TO `T2`");
         sql("alter table t1 set ('key1'='value1')")
                 .ok("ALTER TABLE `T1` SET (\n" + "  'key1' = 'value1'\n" + ")");
+        sql("alter table if exists t1 set ('key1'='value1')")
+                .ok("ALTER TABLE IF EXISTS `T1` SET (\n" + "  'key1' = 'value1'\n" + ")");
         sql("alter table t1 add constraint ct1 primary key(a, b) not enforced")
                 .ok(
                         "ALTER TABLE `T1` ADD (\n"
                                 + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
                                 + ")");
+        sql("alter table if exists t1 add constraint ct1 primary key(a, b) not enforced")
+                .ok(
+                        "ALTER TABLE IF EXISTS `T1` ADD (\n"
+                                + "  CONSTRAINT `CT1` PRIMARY KEY (`A`, `B`) NOT ENFORCED\n"
+                                + ")");
         sql("alter table t1 " + "add unique(a, b)")
                 .ok("ALTER TABLE `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")");
+        sql("alter table if exists t1 " + "add unique(a, b)")
+                .ok("ALTER TABLE IF EXISTS `T1` ADD (\n" + "  UNIQUE (`A`, `B`)\n" + ")");
         sql("alter table t1 drop constraint ct1").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT1`");
+        sql("alter table if exists t1 drop constraint ct1")
+                .ok("ALTER TABLE IF EXISTS `T1` DROP CONSTRAINT `CT1`");
         sql("alter table t1 rename a to b").ok("ALTER TABLE `T1` RENAME `A` TO `B`");
-        sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` TO `A`.`Y`");
+        sql("alter table if exists t1 rename a to b")
+                .ok("ALTER TABLE IF EXISTS `T1` RENAME `A` TO `B`");
+        sql("alter table if exists t1 rename a.x to a.y")
+                .ok("ALTER TABLE IF EXISTS `T1` RENAME `A`.`X` TO `A`.`Y`");
     }
 
     @Test
@@ -359,6 +377,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     void testAlterTableAddSingleColumn() {
+        sql("alter table if exists t1 add new_column int not null")
+                .ok(
+                        "ALTER TABLE IF EXISTS `T1` ADD (\n"
+                                + "  `NEW_COLUMN` INTEGER NOT NULL\n"
+                                + ")");
         sql("alter table t1 add new_column string comment 'new_column docs'")
                 .ok(
                         "ALTER TABLE `T1` ADD (\n"
@@ -390,6 +413,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     void testAlterTableAddWatermark() {
+        sql("alter table if exists t1 add watermark for ts as ts")
+                .ok("ALTER TABLE IF EXISTS `T1` ADD (\n" + "  WATERMARK FOR `TS` AS `TS`\n" + ")");
         sql("alter table t1 add watermark for ts as ts - interval '1' second")
                 .ok(
                         "ALTER TABLE `T1` ADD (\n"
@@ -452,6 +477,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     public void testAlterTableModifySingleColumn() {
+        sql("alter table if exists t1 modify new_column string comment 'new_column docs'")
+                .ok(
+                        "ALTER TABLE IF EXISTS `T1` MODIFY (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column docs'\n"
+                                + ")");
         sql("alter table t1 modify new_column string comment 'new_column docs'")
                 .ok(
                         "ALTER TABLE `T1` MODIFY (\n"
@@ -504,6 +534,11 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     void testAlterTableModifyWatermark() {
+        sql("alter table if exists t1 modify watermark for ts as ts")
+                .ok(
+                        "ALTER TABLE IF EXISTS `T1` MODIFY (\n"
+                                + "  WATERMARK FOR `TS` AS `TS`\n"
+                                + ")");
         sql("alter table t1 modify watermark for ts as ts - interval '1' second")
                 .ok(
                         "ALTER TABLE `T1` MODIFY (\n"
@@ -565,6 +600,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     public void testAlterTableDropSingleColumn() {
+        sql("alter table if exists t1 drop id")
+                .ok("ALTER TABLE IF EXISTS `T1` DROP (\n" + "  `ID`\n" + ")");
         sql("alter table t1 drop id").ok("ALTER TABLE `T1` DROP (\n" + "  `ID`\n" + ")");
 
         sql("alter table t1 drop (id)").ok("ALTER TABLE `T1` DROP (\n" + "  `ID`\n" + ")");
@@ -575,6 +612,14 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     public void testAlterTableDropMultipleColumn() {
+        sql("alter table if exists t1 drop (id, ts, tuple.f0, tuple.f1)")
+                .ok(
+                        "ALTER TABLE IF EXISTS `T1` DROP (\n"
+                                + "  `ID`,\n"
+                                + "  `TS`,\n"
+                                + "  `TUPLE`.`F0`,\n"
+                                + "  `TUPLE`.`F1`\n"
+                                + ")");
         sql("alter table t1 drop (id, ts, tuple.f0, tuple.f1)")
                 .ok(
                         "ALTER TABLE `T1` DROP (\n"
@@ -587,11 +632,15 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     public void testAlterTableDropPrimaryKey() {
+        sql("alter table if exists t1 drop primary key")
+                .ok("ALTER TABLE IF EXISTS `T1` DROP PRIMARY KEY");
         sql("alter table t1 drop primary key").ok("ALTER TABLE `T1` DROP PRIMARY KEY");
     }
 
     @Test
     public void testAlterTableDropConstraint() {
+        sql("alter table if exists t1 drop constraint ct")
+                .ok("ALTER TABLE IF EXISTS `T1` DROP CONSTRAINT `CT`");
         sql("alter table t1 drop constraint ct").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT`");
 
         sql("alter table t1 drop constrain^t^")
@@ -600,11 +649,16 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     public void testAlterTableDropWatermark() {
+        sql("alter table if exists t1 drop watermark")
+                .ok("ALTER TABLE IF EXISTS `T1` DROP WATERMARK");
         sql("alter table t1 drop watermark").ok("ALTER TABLE `T1` DROP WATERMARK");
     }
 
     @Test
     void testAlterTableReset() {
+        sql("alter table if exists t1 reset ('key1')")
+                .ok("ALTER TABLE IF EXISTS `T1` RESET (\n  'key1'\n)");
+
         sql("alter table t1 reset ('key1')").ok("ALTER TABLE `T1` RESET (\n  'key1'\n)");
 
         sql("alter table t1 reset ('key1', 'key2')")
@@ -615,6 +669,8 @@ class FlinkSqlParserImplTest extends SqlParserTest {
 
     @Test
     void testAlterTableCompact() {
+        sql("alter table if exists t1 compact").ok("ALTER TABLE IF EXISTS `T1` COMPACT");
+
         sql("alter table t1 compact").ok("ALTER TABLE `T1` COMPACT");
 
         sql("alter table db1.t1 compact").ok("ALTER TABLE `DB1`.`T1` COMPACT");
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4a1d000cc5e..f33c2c31f2b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -981,14 +981,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                     catalog.renameTable(
                             alterTableRenameOp.getTableIdentifier().toObjectPath(),
                             alterTableRenameOp.getNewTableIdentifier().getObjectName(),
-                            false);
+                            alterTableRenameOp.ignoreIfTableNotExists());
                 } else if (alterTableOperation instanceof AlterTableOptionsOperation) {
                     AlterTableOptionsOperation alterTablePropertiesOp =
                             (AlterTableOptionsOperation) operation;
                     catalogManager.alterTable(
                             alterTablePropertiesOp.getCatalogTable(),
                             alterTablePropertiesOp.getTableIdentifier(),
-                            false);
+                            alterTablePropertiesOp.ignoreIfTableNotExists());
                 } else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {
                     AlterPartitionPropertiesOperation alterPartPropsOp =
                             (AlterPartitionPropertiesOperation) operation;
@@ -996,35 +996,39 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                             alterPartPropsOp.getTableIdentifier().toObjectPath(),
                             alterPartPropsOp.getPartitionSpec(),
                             alterPartPropsOp.getCatalogPartition(),
-                            false);
+                            alterPartPropsOp.ignoreIfTableNotExists());
                 } else if (alterTableOperation instanceof AlterTableSchemaOperation) {
                     AlterTableSchemaOperation alterTableSchemaOperation =
                             (AlterTableSchemaOperation) alterTableOperation;
                     catalogManager.alterTable(
                             alterTableSchemaOperation.getCatalogTable(),
                             alterTableSchemaOperation.getTableIdentifier(),
-                            false);
+                            alterTableSchemaOperation.ignoreIfTableNotExists());
                 } else if (alterTableOperation instanceof AddPartitionsOperation) {
                     AddPartitionsOperation addPartitionsOperation =
                             (AddPartitionsOperation) alterTableOperation;
                     List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();
                     List<CatalogPartition> partitions =
                             addPartitionsOperation.getCatalogPartitions();
-                    boolean ifNotExists = addPartitionsOperation.ifNotExists();
                     ObjectPath tablePath =
                             addPartitionsOperation.getTableIdentifier().toObjectPath();
                     for (int i = 0; i < specs.size(); i++) {
                         catalog.createPartition(
-                                tablePath, specs.get(i), partitions.get(i), ifNotExists);
+                                tablePath,
+                                specs.get(i),
+                                partitions.get(i),
+                                addPartitionsOperation.ignoreIfPartitionExists());
                     }
                 } else if (alterTableOperation instanceof DropPartitionsOperation) {
                     DropPartitionsOperation dropPartitionsOperation =
                             (DropPartitionsOperation) alterTableOperation;
                     ObjectPath tablePath =
                             dropPartitionsOperation.getTableIdentifier().toObjectPath();
-                    boolean ifExists = dropPartitionsOperation.ifExists();
                     for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {
-                        catalog.dropPartition(tablePath, spec, ifExists);
+                        catalog.dropPartition(
+                                tablePath,
+                                spec,
+                                dropPartitionsOperation.ignoreIfPartitionNotExists());
                     }
                 } else if (alterTableOperation instanceof AlterTableChangeOperation) {
                     AlterTableChangeOperation alterTableChangeOperation =
@@ -1033,7 +1037,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                             alterTableChangeOperation.getNewTable(),
                             alterTableChangeOperation.getTableChanges(),
                             alterTableChangeOperation.getTableIdentifier(),
-                            false);
+                            alterTableChangeOperation.ignoreIfTableNotExists());
                 }
                 return TableResultImpl.TABLE_RESULT_OK;
             } catch (TableAlreadyExistException | TableNotExistException e) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
index 2bc8f91cb0e..7a94c4f1cb1 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AddPartitionsOperation.java
@@ -29,17 +29,17 @@ import java.util.Map;
 /** Operation to describe ALTER TABLE ADD PARTITION statement. */
 public class AddPartitionsOperation extends AlterTableOperation {
 
-    private final boolean ifNotExists;
+    private final boolean ignoreIfPartitionExists;
     private final List<CatalogPartitionSpec> partitionSpecs;
     private final List<CatalogPartition> catalogPartitions;
 
     public AddPartitionsOperation(
             ObjectIdentifier tableIdentifier,
-            boolean ifNotExists,
+            boolean ignoreIfPartitionExists,
             List<CatalogPartitionSpec> partitionSpecs,
             List<CatalogPartition> catalogPartitions) {
-        super(tableIdentifier);
-        this.ifNotExists = ifNotExists;
+        super(tableIdentifier, false);
+        this.ignoreIfPartitionExists = ignoreIfPartitionExists;
         this.partitionSpecs = partitionSpecs;
         this.catalogPartitions = catalogPartitions;
     }
@@ -52,8 +52,8 @@ public class AddPartitionsOperation extends AlterTableOperation {
         return catalogPartitions;
     }
 
-    public boolean ifNotExists() {
-        return ifNotExists;
+    public boolean ignoreIfPartitionExists() {
+        return ignoreIfPartitionExists;
     }
 
     @Override
@@ -61,7 +61,7 @@ public class AddPartitionsOperation extends AlterTableOperation {
         StringBuilder builder =
                 new StringBuilder(
                         String.format("ALTER TABLE %s ADD", tableIdentifier.asSummaryString()));
-        if (ifNotExists) {
+        if (ignoreIfPartitionExists) {
             builder.append(" IF NOT EXISTS");
         }
         for (int i = 0; i < partitionSpecs.size(); i++) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
index 50ef54b5ad4..de2bba9dcf6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterPartitionOperation.java
@@ -30,7 +30,7 @@ public abstract class AlterPartitionOperation extends AlterTableOperation {
 
     public AlterPartitionOperation(
             ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) {
-        super(tableIdentifier);
+        super(tableIdentifier, false);
         this.partitionSpec = partitionSpec;
     }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
index ad083d1e574..600f130df8e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableChangeOperation.java
@@ -36,8 +36,9 @@ public class AlterTableChangeOperation extends AlterTableOperation {
     public AlterTableChangeOperation(
             ObjectIdentifier tableIdentifier,
             List<TableChange> tableChanges,
-            CatalogTable newTable) {
-        super(tableIdentifier);
+            CatalogTable newTable,
+            boolean ignoreIfNotExists) {
+        super(tableIdentifier, ignoreIfNotExists);
         this.tableChanges = Collections.unmodifiableList(tableChanges);
         this.newTable = newTable;
     }
@@ -54,7 +55,11 @@ public class AlterTableChangeOperation extends AlterTableOperation {
     public String asSummaryString() {
         String changes =
                 tableChanges.stream().map(this::toString).collect(Collectors.joining(",\n"));
-        return String.format("ALTER TABLE %s\n%s", tableIdentifier.asSummaryString(), changes);
+        return String.format(
+                "ALTER TABLE %s%s\n%s",
+                ignoreIfTableNotExists ? "IF EXISTS " : "",
+                tableIdentifier.asSummaryString(),
+                changes);
     }
 
     private String toString(TableChange tableChange) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
index ad28a28addf..9827dc7c633 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOperation.java
@@ -24,13 +24,20 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
  * Abstract Operation to describe all ALTER TABLE statements such as rename table /set properties.
  */
 public abstract class AlterTableOperation implements AlterOperation {
+
     protected final ObjectIdentifier tableIdentifier;
+    protected final boolean ignoreIfTableNotExists;
 
-    public AlterTableOperation(ObjectIdentifier tableIdentifier) {
+    public AlterTableOperation(ObjectIdentifier tableIdentifier, boolean ignoreIfTableNotExists) {
         this.tableIdentifier = tableIdentifier;
+        this.ignoreIfTableNotExists = ignoreIfTableNotExists;
     }
 
     public ObjectIdentifier getTableIdentifier() {
         return tableIdentifier;
     }
+
+    public boolean ignoreIfTableNotExists() {
+        return ignoreIfTableNotExists;
+    }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
index 4b505d1b1ab..ecb58a8b51d 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableOptionsOperation.java
@@ -33,8 +33,11 @@ import java.util.stream.Collectors;
 public class AlterTableOptionsOperation extends AlterTableOperation {
     private final CatalogTable catalogTable;
 
-    public AlterTableOptionsOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
-        super(tableIdentifier);
+    public AlterTableOptionsOperation(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            boolean ignoreIfNotExists) {
+        super(tableIdentifier, ignoreIfNotExists);
         this.catalogTable = catalogTable;
     }
 
@@ -52,6 +55,9 @@ public class AlterTableOptionsOperation extends AlterTableOperation {
                                                 entry.getKey(), entry.getValue()))
                         .collect(Collectors.joining(", "));
         return String.format(
-                "ALTER TABLE %s SET (%s)", tableIdentifier.asSummaryString(), description);
+                "ALTER %sTABLE %s SET (%s)",
+                ignoreIfTableNotExists ? "IF EXISTS " : "",
+                tableIdentifier.asSummaryString(),
+                description);
     }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
index f0d8b6a41be..e1084b84ea5 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableRenameOperation.java
@@ -25,8 +25,10 @@ public class AlterTableRenameOperation extends AlterTableOperation {
     private final ObjectIdentifier newTableIdentifier;
 
     public AlterTableRenameOperation(
-            ObjectIdentifier tableIdentifier, ObjectIdentifier newTableIdentifier) {
-        super(tableIdentifier);
+            ObjectIdentifier tableIdentifier,
+            ObjectIdentifier newTableIdentifier,
+            boolean ignoreIfNotExists) {
+        super(tableIdentifier, ignoreIfNotExists);
         this.newTableIdentifier = newTableIdentifier;
     }
 
@@ -37,7 +39,9 @@ public class AlterTableRenameOperation extends AlterTableOperation {
     @Override
     public String asSummaryString() {
         return String.format(
-                "ALTER TABLE %s RENAME TO %s",
-                tableIdentifier.asSummaryString(), newTableIdentifier.asSummaryString());
+                "ALTER TABLE %s%s RENAME TO %s",
+                ignoreIfTableNotExists ? "IF EXISTS " : "",
+                tableIdentifier.asSummaryString(),
+                newTableIdentifier.asSummaryString());
     }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
index 3ec668a955f..71a16336d61 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableSchemaOperation.java
@@ -27,8 +27,11 @@ public class AlterTableSchemaOperation extends AlterTableOperation {
     // the CatalogTable with the updated schema
     private final CatalogTable catalogTable;
 
-    public AlterTableSchemaOperation(ObjectIdentifier tableIdentifier, CatalogTable catalogTable) {
-        super(tableIdentifier);
+    public AlterTableSchemaOperation(
+            ObjectIdentifier tableIdentifier,
+            CatalogTable catalogTable,
+            boolean ignoreIfNotExists) {
+        super(tableIdentifier, ignoreIfNotExists);
         this.catalogTable = catalogTable;
     }
 
@@ -39,7 +42,9 @@ public class AlterTableSchemaOperation extends AlterTableOperation {
     @Override
     public String asSummaryString() {
         return String.format(
-                "ALTER TABLE %s SET SCHEMA %s",
-                tableIdentifier.asSummaryString(), catalogTable.getUnresolvedSchema().toString());
+                "ALTER TABLE %s%s SET SCHEMA %s",
+                ignoreIfTableNotExists ? "IF EXISTS " : "",
+                tableIdentifier.asSummaryString(),
+                catalogTable.getUnresolvedSchema().toString());
     }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
index 371cf6c117a..373f5e4e607 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropPartitionsOperation.java
@@ -27,20 +27,20 @@ import java.util.List;
 /** Operation to describe ALTER TABLE DROP PARTITION statement. */
 public class DropPartitionsOperation extends AlterTableOperation {
 
-    private final boolean ifExists;
+    private final boolean ignoreIfPartitionNotExists;
     private final List<CatalogPartitionSpec> partitionSpecs;
 
     public DropPartitionsOperation(
             ObjectIdentifier tableIdentifier,
-            boolean ifExists,
+            boolean ignoreIfPartitionNotExists,
             List<CatalogPartitionSpec> partitionSpecs) {
-        super(tableIdentifier);
-        this.ifExists = ifExists;
+        super(tableIdentifier, false);
+        this.ignoreIfPartitionNotExists = ignoreIfPartitionNotExists;
         this.partitionSpecs = partitionSpecs;
     }
 
-    public boolean ifExists() {
-        return ifExists;
+    public boolean ignoreIfPartitionNotExists() {
+        return ignoreIfPartitionNotExists;
     }
 
     public List<CatalogPartitionSpec> getPartitionSpecs() {
@@ -52,7 +52,7 @@ public class DropPartitionsOperation extends AlterTableOperation {
         StringBuilder builder =
                 new StringBuilder(
                         String.format("ALTER TABLE %s DROP", tableIdentifier.asSummaryString()));
-        if (ifExists) {
+        if (ignoreIfPartitionNotExists) {
             builder.append(" IF EXISTS");
         }
         for (CatalogPartitionSpec spec : partitionSpecs) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index 420391b6e82..ff15b6bdc2a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -431,8 +431,9 @@ public interface Catalog {
      * @param tablePath path of the table.
      * @param partitionSpec partition spec of the partition
      * @param partition the partition to add.
-     * @param ignoreIfExists flag to specify behavior if a table with the given name already exists:
-     *     if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens.
+     * @param ignoreIfExists flag to specify behavior if a partition with the given name already
+     *     exists: if set to false, it throws a PartitionAlreadyExistsException, if set to true,
+     *     nothing happens.
      * @throws TableNotExistException thrown if the target table does not exist
      * @throws TableNotPartitionedException thrown if the target table is not partitioned
      * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
@@ -453,7 +454,7 @@ public interface Catalog {
      *
      * @param tablePath path of the table.
      * @param partitionSpec partition spec of the partition to drop
-     * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+     * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to
      *     false, throw an exception, if set to true, nothing happens.
      * @throws PartitionNotExistException thrown if the target partition does not exist
      * @throws CatalogException in case of any runtime exception
@@ -468,7 +469,7 @@ public interface Catalog {
      * @param tablePath path of the table
      * @param partitionSpec partition spec of the partition
      * @param newPartition new partition to replace the old one
-     * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+     * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to
      *     false, throw an exception, if set to true, nothing happens.
      * @throws PartitionNotExistException thrown if the target partition does not exist
      * @throws CatalogException in case of any runtime exception
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 57f6fae4384..15c0718c2a8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -941,7 +941,8 @@ public class AlterSchemaConverter {
                         newSchema,
                         oldTable.getComment(),
                         oldTable.getPartitionKeys(),
-                        oldTable.getOptions()));
+                        oldTable.getOptions()),
+                alterTable.ifTableExists());
     }
 
     private static String getColumnName(SqlIdentifier identifier) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 0603160cc6a..96d279517fd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -132,6 +132,7 @@ import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.ShowCatalogsOperation;
@@ -484,6 +485,9 @@ public class SqlToOperationConverter {
         Optional<ContextResolvedTable> optionalCatalogTable =
                 catalogManager.getTable(tableIdentifier);
         if (!optionalCatalogTable.isPresent() || optionalCatalogTable.get().isTemporary()) {
+            if (sqlAlterTable.ifTableExists()) {
+                return new NopOperation();
+            }
             throw new ValidationException(
                     String.format(
                             "Table %s doesn't exist or is a temporary table.", tableIdentifier));
@@ -499,7 +503,8 @@ public class SqlToOperationConverter {
                             ((SqlAlterTableRename) sqlAlterTable).fullNewTableName());
             ObjectIdentifier newTableIdentifier =
                     catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
-            return new AlterTableRenameOperation(tableIdentifier, newTableIdentifier);
+            return new AlterTableRenameOperation(
+                    tableIdentifier, newTableIdentifier, sqlAlterTable.ifTableExists());
         } else if (sqlAlterTable instanceof SqlAlterTableOptions) {
             return convertAlterTableOptions(
                     tableIdentifier,
@@ -547,7 +552,7 @@ public class SqlToOperationConverter {
                 partitions.add(new CatalogPartitionImpl(props, null));
             }
             return new AddPartitionsOperation(
-                    tableIdentifier, addPartitions.ifNotExists(), specs, partitions);
+                    tableIdentifier, addPartitions.ifPartitionNotExists(), specs, partitions);
         } else if (sqlAlterTable instanceof SqlDropPartitions) {
             SqlDropPartitions dropPartitions = (SqlDropPartitions) sqlAlterTable;
             List<CatalogPartitionSpec> specs = new ArrayList<>();
@@ -607,7 +612,8 @@ public class SqlToOperationConverter {
                     changeOptions.entrySet().stream()
                             .map(entry -> TableChange.set(entry.getKey(), entry.getValue()))
                             .collect(Collectors.toList()),
-                    oldTable.copy(newOptions));
+                    oldTable.copy(newOptions),
+                    alterTableOptions.ifTableExists());
         }
     }
 
@@ -630,7 +636,8 @@ public class SqlToOperationConverter {
         return new AlterTableChangeOperation(
                 tableIdentifier,
                 resetKeys.stream().map(TableChange::reset).collect(Collectors.toList()),
-                oldTable.copy(newOptions));
+                oldTable.copy(newOptions),
+                alterTableReset.ifTableExists());
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 7e0079b1568..806ced0ed0e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -139,9 +139,11 @@ public class OperationConverterUtils {
             // So we don't translate with TableChanges here. One workaround is
             // to minimize the edit distance, i.e., minimize the modification times, but it
             // still cannot provide a deterministic answer.
-            return new AlterTableSchemaOperation(tableIdentifier, newTable);
+            return new AlterTableSchemaOperation(
+                    tableIdentifier, newTable, addReplaceColumns.ifTableExists());
         } else {
-            return new AlterTableChangeOperation(tableIdentifier, tableChanges, newTable);
+            return new AlterTableChangeOperation(
+                    tableIdentifier, tableChanges, newTable, addReplaceColumns.ifTableExists());
         }
     }
 
@@ -185,7 +187,8 @@ public class OperationConverterUtils {
                         newSchema,
                         catalogTable.getPartitionKeys(),
                         newProperties,
-                        catalogTable.getComment()));
+                        catalogTable.getComment()),
+                false);
         // TODO: handle watermark and constraints
     }
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 253c147effa..5642359e807 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -58,6 +58,7 @@ import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
+import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.ShowFunctionsOperation;
@@ -1225,8 +1226,8 @@ public class SqlToOperationConverterTest {
         final ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("cat1", "db1", "tb1");
         final ObjectIdentifier expectedNewIdentifier = ObjectIdentifier.of("cat1", "db1", "tb2");
         // test rename table converter
-        for (int i = 0; i < renameTableSqls.length; i++) {
-            Operation operation = parse(renameTableSqls[i]);
+        for (String renameTableSql : renameTableSqls) {
+            Operation operation = parse(renameTableSql);
             assertThat(operation).isInstanceOf(AlterTableRenameOperation.class);
             final AlterTableRenameOperation alterTableRenameOperation =
                     (AlterTableRenameOperation) operation;
@@ -1235,8 +1236,13 @@ public class SqlToOperationConverterTest {
             assertThat(alterTableRenameOperation.getNewTableIdentifier())
                     .isEqualTo(expectedNewIdentifier);
         }
+        // test alter nonexistent table
+        checkAlterNonExistTable("alter table %s nonexistent rename to tb2");
+
         // test alter table options
-        Operation operation = parse("alter table cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
+        checkAlterNonExistTable("alter table %s nonexistent set ('k1' = 'v1', 'K2' = 'V2')");
+        Operation operation =
+                parse("alter table if exists cat1.db1.tb1 set ('k1' = 'v1', 'K2' = 'V2')");
         Map<String, String> expectedOptions = new HashMap<>();
         expectedOptions.put("connector", "dummy");
         expectedOptions.put("k", "v");
@@ -1248,16 +1254,17 @@ public class SqlToOperationConverterTest {
                 expectedIdentifier,
                 expectedOptions,
                 Arrays.asList(TableChange.set("k1", "v1"), TableChange.set("K2", "V2")),
-                "ALTER TABLE cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 'K2' = 'V2'");
+                "ALTER TABLE IF EXISTS cat1.db1.tb1\n  SET 'k1' = 'v1',\n  SET 'K2' = 'V2'");
 
         // test alter table reset
-        operation = parse("alter table cat1.db1.tb1 reset ('k')");
+        checkAlterNonExistTable("alter table %s nonexistent reset ('k')");
+        operation = parse("alter table if exists cat1.db1.tb1 reset ('k')");
         assertAlterTableOptions(
                 operation,
                 expectedIdentifier,
                 Collections.singletonMap("connector", "dummy"),
                 Collections.singletonList(TableChange.reset("k")),
-                "ALTER TABLE cat1.db1.tb1\n  RESET 'k'");
+                "ALTER TABLE IF EXISTS cat1.db1.tb1\n  RESET 'k'");
         assertThatThrownBy(() -> parse("alter table cat1.db1.tb1 reset ('connector')"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("ALTER TABLE RESET does not support changing 'connector'");
@@ -1378,6 +1385,7 @@ public class SqlToOperationConverterTest {
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys.");
+        checkAlterNonExistTable("alter table %s nonexistent rename a to a1");
     }
 
     @Test
@@ -1417,6 +1425,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb1 drop ts"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("The column `ts` is referenced by watermark expression.");
+        checkAlterNonExistTable("alter table %s nonexistent drop a");
     }
 
     @Test
@@ -1465,6 +1474,8 @@ public class SqlToOperationConverterTest {
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1'].");
+        checkAlterNonExistTable("alter table %s nonexistent drop primary key");
+        checkAlterNonExistTable("alter table %s nonexistent drop constraint ct");
     }
 
     @Test
@@ -1499,6 +1510,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb1 drop watermark"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining("The base table does not define any watermark strategy.");
+        checkAlterNonExistTable("alter table %s nonexistent drop watermark");
     }
 
     @Test
@@ -1536,9 +1548,7 @@ public class SqlToOperationConverterTest {
                         "Partition column 'dt' not defined in the table schema. Table `cat1`.`db1`.`tb1` is not partitioned.");
 
         // alter a non-existed table
-        assertThatThrownBy(() -> parse("alter table tb2 compact"))
-                .isInstanceOf(ValidationException.class)
-                .hasMessage("Table `cat1`.`db1`.`tb2` doesn't exist or is a temporary table.");
+        checkAlterNonExistTable("alter table %s nonexistent compact");
 
         checkAlterTableCompact(parse("alter table tb1 compact"), Collections.emptyMap());
     }
@@ -1627,6 +1637,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter nested row type e.f3 is not supported yet.");
+        checkAlterNonExistTable("alter table %s nonexistent add a bigint not null");
     }
 
     @Test
@@ -1639,10 +1650,11 @@ public class SqlToOperationConverterTest {
 
         // add a single column
         Operation operation =
-                parse("alter table tb1 add h double not null comment 'h is double not null'");
+                parse(
+                        "alter table if exists tb1 add h double not null comment 'h is double not null'");
         assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1\n"
+                        "ALTER TABLE IF EXISTS cat1.db1.tb1\n"
                                 + "  ADD `h` DOUBLE NOT NULL COMMENT 'h is double not null' ");
         assertAlterTableSchema(
                 operation,
@@ -1815,6 +1827,7 @@ public class SqlToOperationConverterTest {
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "Invalid primary key 'PK_g'. Column 'g' is not a physical column.");
+        checkAlterNonExistTable("alter table %s nonexistent add primary key(x) not enforced");
     }
 
     @Test
@@ -1923,6 +1936,7 @@ public class SqlToOperationConverterTest {
                         "The base table has already defined the watermark strategy "
                                 + "`ts` AS ts - interval '5' seconds. "
                                 + "You might want to drop it before adding a new one.");
+        checkAlterNonExistTable("alter table %s nonexistent add watermark for ts as ts");
     }
 
     @Test
@@ -2080,6 +2094,7 @@ public class SqlToOperationConverterTest {
         assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Alter nested row type e.f0 is not supported yet.");
+        checkAlterNonExistTable("alter table %s nonexistent modify a int first");
     }
 
     @Test
@@ -2273,6 +2288,8 @@ public class SqlToOperationConverterTest {
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "Invalid primary key 'ct'. Column 'g' is not a physical column.");
+        checkAlterNonExistTable(
+                "alter table %s nonexistent modify constraint ct primary key(a) not enforced");
     }
 
     @Test
@@ -2354,6 +2371,7 @@ public class SqlToOperationConverterTest {
                         "Invalid data type of time field for watermark definition. "
                                 + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, "
                                 + "but the time field type is STRING");
+        checkAlterNonExistTable("alter table %s nonexistent modify watermark for ts as ts");
     }
 
     @Test
@@ -2850,6 +2868,15 @@ public class SqlToOperationConverterTest {
         return plannerContext.createCalciteParser();
     }
 
+    private void checkAlterNonExistTable(String sqlTemplate) {
+        assertThat(parse(String.format(sqlTemplate, "if exists ")))
+                .isInstanceOf(NopOperation.class);
+        assertThatThrownBy(() -> parse(String.format(sqlTemplate, "")))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Table `cat1`.`db1`.`nonexistent` doesn't exist or is a temporary table.");
+    }
+
     private void checkAlterTableCompact(Operation operation, Map<String, String> staticPartitions) {
         assertThat(operation).isInstanceOf(SinkModifyOperation.class);
         SinkModifyOperation modifyOperation = (SinkModifyOperation) operation;