You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yu...@apache.org on 2023/07/18 12:43:50 UTC

[flink] branch master updated: [FLINK-32518][table] Enable atomicity for [CREATE OR] REPLACE TABLE AS statement (#22995)

This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc9712bfe92 [FLINK-32518][table] Enable atomicity for [CREATE OR] REPLACE TABLE AS statement (#22995)
cc9712bfe92 is described below

commit cc9712bfe92de4a68e87baef8f6937dcdc8a8634
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Jul 18 20:43:41 2023 +0800

    [FLINK-32518][table] Enable atomicity for [CREATE OR] REPLACE TABLE AS statement (#22995)
---
 .../generated/table_config_configuration.html      |   4 +-
 .../flink/table/api/config/TableConfigOptions.java |   8 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 124 +++++++-----
 ...atusHook.java => StagingSinkJobStatusHook.java} |   9 +-
 .../table/operations/ReplaceTableAsOperation.java  |  20 ++
 .../operations/StagedSinkModifyOperation.java      |  15 +-
 .../apache/flink/table/catalog/StagedTable.java    |  15 +-
 .../connector/sink/abilities/SupportsStaging.java  |  17 +-
 .../runtime/batch/sql/AtomicRtasITCase.java        |  35 ++++
 .../runtime/stream/sql/AtomicRtasITCase.java       |  36 ++++
 .../runtime/utils/AtomicCtasITCaseBase.java        |  11 +-
 .../runtime/utils/AtomicRtasITCaseBase.java        | 225 +++++++++++++++++++++
 12 files changed, 438 insertions(+), 81 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html b/docs/layouts/shortcodes/generated/table_config_configuration.html
index bc78ecb4c55..c67ddf6a92d 100644
--- a/docs/layouts/shortcodes/generated/table_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/table_config_configuration.html
@@ -21,10 +21,10 @@
             <td>The name of the default database in the initial catalog to be created when instantiating TableEnvironment.</td>
         </tr>
         <tr>
-            <td><h5>table.ctas.atomicity-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
+            <td><h5>table.rtas-ctas.atomicity-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Specifies if the CREATE TABLE AS SELECT statement is executed atomically. By default, the statement is non-atomic. The target table is created on the client side, and it will not be dropped even though the job fails or is canceled. If set this option to true and the underlying DynamicTableSink implements the SupportsStaging interface, the statement is expected to be executed atomically, the behavior of which depends on the actual DynamicTableSink.</td>
+            <td>Specifies if the CREATE TABLE/REPLACE TABLE/CREATE OR REPLACE AS SELECT statement is executed atomically. By default, the statement is non-atomic. The target table is created/replaced on the client side, and it will not be rolled back even though the job fails or is canceled. If set this option to true and the underlying DynamicTableSink implements the SupportsStaging interface, the statement is expected to be executed atomically, the behavior of which depends on the actu [...]
         </tr>
         <tr>
             <td><h5>table.display.max-column-width</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 2871be40be0..edfe1b63475 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -198,13 +198,13 @@ public class TableConfigOptions {
                             "Local directory that is used by planner for storing downloaded resources.");
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
-    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
-            key("table.ctas.atomicity-enabled")
+    public static final ConfigOption<Boolean> TABLE_RTAS_CTAS_ATOMICITY_ENABLED =
+            key("table.rtas-ctas.atomicity-enabled")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Specifies if the CREATE TABLE AS SELECT statement is executed atomically. By default, the statement is non-atomic. "
-                                    + "The target table is created on the client side, and it will not be dropped even though the job fails or is canceled. "
+                            "Specifies if the CREATE TABLE/REPLACE TABLE/CREATE OR REPLACE AS SELECT statement is executed atomically. By default, the statement is non-atomic. "
+                                    + "The target table is created/replaced on the client side, and it will not be rolled back even though the job fails or is canceled. "
                                     + "If set this option to true and the underlying DynamicTableSink implements the SupportsStaging interface, "
                                     + "the statement is expected to be executed atomically, the behavior of which depends on the actual DynamicTableSink.");
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 145b82fee78..4e6c91f6a51 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -68,7 +68,7 @@ import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.InternalPlan;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.execution.CtasJobStatusHook;
+import org.apache.flink.table.execution.StagingSinkJobStatusHook;
 import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -797,12 +797,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         List<JobStatusHook> jobStatusHookList = new LinkedList<>();
         for (ModifyOperation modify : operations) {
             if (modify instanceof CreateTableASOperation) {
-                // execute CREATE TABLE first for CTAS statements
                 CreateTableASOperation ctasOperation = (CreateTableASOperation) modify;
                 mapOperations.add(getModifyOperation(ctasOperation, jobStatusHookList));
             } else if (modify instanceof ReplaceTableAsOperation) {
                 ReplaceTableAsOperation rtasOperation = (ReplaceTableAsOperation) modify;
-                mapOperations.add(getOperation(rtasOperation));
+                mapOperations.add(getModifyOperation(rtasOperation, jobStatusHookList));
             } else {
                 boolean isRowLevelModification = isRowLevelModification(modify);
                 if (isRowLevelModification) {
@@ -833,26 +832,46 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         return executeInternal(transformations, sinkIdentifierNames, jobStatusHookList);
     }
 
-    private ModifyOperation getOperation(ReplaceTableAsOperation rtasOperation) {
-        // rtas drop table first, then create
+    private ModifyOperation getModifyOperation(
+            ReplaceTableAsOperation rtasOperation, List<JobStatusHook> jobStatusHookList) {
         CreateTableOperation createTableOperation = rtasOperation.getCreateTableOperation();
         ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
-        try {
-            catalogManager.dropTable(tableIdentifier, rtasOperation.isCreateOrReplace());
-        } catch (ValidationException e) {
-            if (String.format(
-                            "Table with identifier '%s' does not exist.",
-                            tableIdentifier.asSummaryString())
-                    .equals(e.getMessage())) {
-                throw new TableException(
-                        String.format(
-                                "The table %s to be replaced doesn't exist. "
-                                        + "You can try to use CREATE TABLE AS statement or "
-                                        + "CREATE OR REPLACE TABLE AS statement.",
-                                tableIdentifier));
-            } else {
-                throw e;
-            }
+        // First check if the replacedTable exists
+        Optional<ContextResolvedTable> replacedTable = catalogManager.getTable(tableIdentifier);
+        if (!rtasOperation.isCreateOrReplace() && !replacedTable.isPresent()) {
+            throw new TableException(
+                    String.format(
+                            "The table %s to be replaced doesn't exist. "
+                                    + "You can try to use CREATE TABLE AS statement or "
+                                    + "CREATE OR REPLACE TABLE AS statement.",
+                            tableIdentifier));
+        }
+        Catalog catalog =
+                catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
+        ResolvedCatalogTable catalogTable =
+                catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        Optional<DynamicTableSink> stagingDynamicTableSink =
+                getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable);
+        if (stagingDynamicTableSink.isPresent()) {
+            // use atomic rtas
+            DynamicTableSink dynamicTableSink = stagingDynamicTableSink.get();
+            SupportsStaging.StagingPurpose stagingPurpose =
+                    rtasOperation.isCreateOrReplace()
+                            ? SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS
+                            : SupportsStaging.StagingPurpose.REPLACE_TABLE_AS;
+
+            StagedTable stagedTable =
+                    ((SupportsStaging) dynamicTableSink)
+                            .applyStaging(new SinkStagingContext(stagingPurpose));
+            StagingSinkJobStatusHook stagingSinkJobStatusHook =
+                    new StagingSinkJobStatusHook(stagedTable);
+            jobStatusHookList.add(stagingSinkJobStatusHook);
+            return rtasOperation.toStagedSinkModifyOperation(
+                    tableIdentifier, catalogTable, catalog, dynamicTableSink);
+        }
+        // non-atomic rtas drop table first if exists, then create
+        if (replacedTable.isPresent()) {
+            catalogManager.dropTable(tableIdentifier, false);
         }
         executeInternal(createTableOperation);
         return rtasOperation.toSinkModifyOperation(catalogManager);
@@ -861,51 +880,62 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     private ModifyOperation getModifyOperation(
             CreateTableASOperation ctasOperation, List<JobStatusHook> jobStatusHookList) {
         CreateTableOperation createTableOperation = ctasOperation.getCreateTableOperation();
-        if (tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)) {
-            ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
-            Catalog catalog =
-                    catalogManager.getCatalog(tableIdentifier.getCatalogName()).orElse(null);
-            ResolvedCatalogTable catalogTable =
-                    catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
+        Catalog catalog =
+                catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName());
+        ResolvedCatalogTable catalogTable =
+                catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable());
+        Optional<DynamicTableSink> stagingDynamicTableSink =
+                getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable);
+        if (stagingDynamicTableSink.isPresent()) {
+            // use atomic ctas
+            DynamicTableSink dynamicTableSink = stagingDynamicTableSink.get();
+            SupportsStaging.StagingPurpose stagingPurpose =
+                    createTableOperation.isIgnoreIfExists()
+                            ? SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS
+                            : SupportsStaging.StagingPurpose.CREATE_TABLE_AS;
+            StagedTable stagedTable =
+                    ((SupportsStaging) dynamicTableSink)
+                            .applyStaging(new SinkStagingContext(stagingPurpose));
+            StagingSinkJobStatusHook stagingSinkJobStatusHook =
+                    new StagingSinkJobStatusHook(stagedTable);
+            jobStatusHookList.add(stagingSinkJobStatusHook);
+            return ctasOperation.toStagedSinkModifyOperation(
+                    tableIdentifier, catalogTable, catalog, dynamicTableSink);
+        }
+        // use non-atomic ctas, create table first
+        executeInternal(createTableOperation);
+        return ctasOperation.toSinkModifyOperation(catalogManager);
+    }
+
+    private Optional<DynamicTableSink> getSupportsStagingDynamicTableSink(
+            CreateTableOperation createTableOperation,
+            Catalog catalog,
+            ResolvedCatalogTable catalogTable) {
+        if (tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)) {
             if (!TableFactoryUtil.isLegacyConnectorOptions(
                     catalog,
                     tableConfig,
                     isStreamingMode,
-                    tableIdentifier,
+                    createTableOperation.getTableIdentifier(),
                     catalogTable,
                     createTableOperation.isTemporary())) {
                 DynamicTableSink dynamicTableSink =
                         ExecutableOperationUtils.createDynamicTableSink(
                                 catalog,
                                 () -> moduleManager.getFactory((Module::getTableSinkFactory)),
-                                tableIdentifier,
+                                createTableOperation.getTableIdentifier(),
                                 catalogTable,
                                 Collections.emptyMap(),
                                 tableConfig,
                                 resourceManager.getUserClassLoader(),
                                 createTableOperation.isTemporary());
                 if (dynamicTableSink instanceof SupportsStaging) {
-                    // use atomic ctas
-                    SupportsStaging.StagingPurpose stagingPurpose =
-                            createTableOperation.isIgnoreIfExists()
-                                    ? SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS
-                                    : SupportsStaging.StagingPurpose.CREATE_TABLE_AS;
-                    StagedTable stagedTable =
-                            ((SupportsStaging) dynamicTableSink)
-                                    .applyStaging(new SinkStagingContext(stagingPurpose));
-                    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
-                    jobStatusHookList.add(ctasJobStatusHook);
-                    return ctasOperation.toStagedSinkModifyOperation(
-                            createTableOperation.getTableIdentifier(),
-                            catalogTable,
-                            catalog,
-                            dynamicTableSink);
+                    return Optional.of(dynamicTableSink);
                 }
             }
         }
-        // use non-atomic ctas, create table first
-        executeInternal(createTableOperation);
-        return ctasOperation.toSinkModifyOperation(catalogManager);
+        return Optional.empty();
     }
 
     private TableResultInternal executeInternal(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
similarity index 83%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
index 7a3ad40257a..61fee317b79 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/CtasJobStatusHook.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/execution/StagingSinkJobStatusHook.java
@@ -23,14 +23,15 @@ import org.apache.flink.core.execution.JobStatusHook;
 import org.apache.flink.table.catalog.StagedTable;
 
 /**
- * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS SELECT) statement. It'll
- * call the corresponding interfaces of the inner {@link StagedTable} on job status changes.
+ * This hook is used to implement atomic semantics for CTAS(CREATE TABLE AS SELECT) or RTAS([CREATE
+ * OR] REPLACE TABLE AS SELECT) statement. It'll call the corresponding interfaces of the inner
+ * {@link StagedTable} on job status changes.
  */
-public class CtasJobStatusHook implements JobStatusHook {
+public class StagingSinkJobStatusHook implements JobStatusHook {
 
     private final StagedTable stagedTable;
 
-    public CtasJobStatusHook(StagedTable stagedTable) {
+    public StagingSinkJobStatusHook(StagedTable stagedTable) {
         this.stagedTable = stagedTable;
     }
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
index 45f00b0f5de..dbddadde198 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java
@@ -19,7 +19,12 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 
 import java.util.Collections;
@@ -71,6 +76,21 @@ public class ReplaceTableAsOperation implements ModifyOperation {
                 Collections.emptyMap());
     }
 
+    public StagedSinkModifyOperation toStagedSinkModifyOperation(
+            ObjectIdentifier tableIdentifier,
+            ResolvedCatalogTable catalogTable,
+            Catalog catalog,
+            DynamicTableSink dynamicTableSink) {
+        return new StagedSinkModifyOperation(
+                ContextResolvedTable.permanent(tableIdentifier, catalog, catalogTable),
+                sinkModifyQuery,
+                Collections.emptyMap(),
+                null, // targetColumns
+                false,
+                Collections.emptyMap(),
+                dynamicTableSink);
+    }
+
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
index c6459458283..057b7d9f1d7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/StagedSinkModifyOperation.java
@@ -29,14 +29,15 @@ import java.util.Map;
 
 /**
  * DML operation that tells to write to a sink which implements {@link SupportsStaging}. Currently.
- * this operation is only for CTAS(CREATE TABLE AS SELECT) statement.
+ * this operation is for CTAS(CREATE TABLE AS SELECT) and RTAS([CREATE OR] REPLACE TABLE AS SELECT)
+ * statement.
  *
- * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the atomic CTAS scenario.
- * Whiling checking whether the corresponding sink support atomic CTAS or not, we will need to get
- * DynamicTableSink firstly and check whether it implements {@link SupportsStaging} and then call
- * the method {@link SupportsStaging#applyStaging}. We maintain the DynamicTableSink in this
- * operation so that we can reuse this DynamicTableSink instead of creating a new DynamicTableSink
- * during translating the operation again which is error-prone.
+ * <p>StagedSinkModifyOperation is an extension of SinkModifyOperation in the atomic CTAS/RTAS
+ * scenario. Whiling checking whether the corresponding sink support atomic CTAS/RTAS or not, we
+ * will need to get DynamicTableSink firstly and check whether it implements {@link SupportsStaging}
+ * and then call the method {@link SupportsStaging#applyStaging}. We maintain the DynamicTableSink
+ * in this operation so that we can reuse this DynamicTableSink instead of creating a new
+ * DynamicTableSink during translating the operation again which is error-prone.
  */
 @Internal
 public class StagedSinkModifyOperation extends SinkModifyOperation {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
index 4da92b264c4..b8ff249e6ad 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/StagedTable.java
@@ -26,9 +26,10 @@ import java.io.Serializable;
 
 /**
  * The {@link StagedTable} is designed to implement Flink's atomic semantic for CTAS(CREATE TABLE AS
- * SELECT) statement using a two-phase commit protocol. The {@link StagedTable} is supposed to be
- * returned via method {@link SupportsStaging#applyStaging} by the {@link DynamicTableSink} which
- * implements the {@link SupportsStaging} interface.
+ * SELECT) and RTAS([CREATE OR] REPLACE TABLE AS SELECT) statement using a two-phase commit
+ * protocol. The {@link StagedTable} is supposed to be returned via method {@link
+ * SupportsStaging#applyStaging} by the {@link DynamicTableSink} which implements the {@link
+ * SupportsStaging} interface.
  *
  * <p>When the Flink job for writing to a {@link DynamicTableSink} with atomic semantic supporting
  * is CREATED, the {@link StagedTable#begin()} will be called; when the Flink job is FINISHED, the
@@ -41,15 +42,15 @@ import java.io.Serializable;
 public interface StagedTable extends Serializable {
 
     /**
-     * This method will be called when the job is started. In Flink's atomic CTAS scenario, it is
-     * expected to do initialization work; For example, initializing the client of the underlying
+     * This method will be called when the job is started. In Flink's atomic CTAS/RTAS scenario, it
+     * is expected to do initialization work; For example, initializing the client of the underlying
      * service, the tmp path of the underlying storage, or even call the start transaction API of
      * the underlying service, etc.
      */
     void begin();
 
     /**
-     * This method will be called when the job succeeds. In Flink's atomic CTAS scenario, it is
+     * This method will be called when the job succeeds. In Flink's atomic CTAS/RTAS scenario, it is
      * expected to do some commit work. For example, moving the underlying data to the target
      * directory to make it visible, writing buffer data to the underlying storage service, or even
      * call the commit transaction API of the underlying service, etc.
@@ -57,7 +58,7 @@ public interface StagedTable extends Serializable {
     void commit();
 
     /**
-     * This method will be called when the job is failed or is canceled. In Flink's atomic CTAS
+     * This method will be called when the job is failed or is canceled. In Flink's atomic CTAS/RTAS
      * scenario, it is expected to do some cleaning work for writing; For example, delete the data
      * in the tmp directory, delete the temporary data in the underlying storage service, or even
      * call the rollback transaction API of the underlying service, etc.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
index 9d79db183f7..eb862920382 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsStaging.java
@@ -24,11 +24,12 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 
 /**
  * Interface for {@link DynamicTableSink}s that support atomic semantic for CTAS(CREATE TABLE AS
- * SELECT) statement using a two-phase commit protocol. The table sink is responsible for returning
- * a {@link StagedTable} to tell the Flink how to implement the atomicity semantics.
+ * SELECT) or RTAS([CREATE OR] REPLACE TABLE AS SELECT) statement using a two-phase commit protocol.
+ * The table sink is responsible for returning a {@link StagedTable} to tell the Flink how to
+ * implement the atomicity semantics.
  *
- * <p>If the user turns on {@link TableConfigOptions#TABLE_CTAS_ATOMICITY_ENABLED}, and the {@link
- * DynamicTableSink} implements {@link SupportsStaging}, the planner will call method {@link
+ * <p>If the user turns on {@link TableConfigOptions#TABLE_RTAS_CTAS_ATOMICITY_ENABLED}, and the
+ * {@link DynamicTableSink} implements {@link SupportsStaging}, the planner will call method {@link
  * #applyStaging(StagingContext)} to get the {@link StagedTable} returned by the sink, then the
  * {@link StagedTable} will be used by Flink to implement a two-phase commit with the actual
  * implementation of the {@link StagedTable}.
@@ -38,8 +39,8 @@ public interface SupportsStaging {
 
     /**
      * Provides a {@link StagingContext} for the sink modification and return a {@link StagedTable}.
-     * The {@link StagedTable} provides transaction abstraction to support atomicity for CTAS. Flink
-     * will call the relevant API of StagedTable when the Job status switches,
+     * The {@link StagedTable} provides transaction abstraction to support atomicity for CTAS/RTAS.
+     * Flink will call the relevant API of StagedTable when the Job status switches,
      *
      * <p>Note: This method will be called at the compile stage.
      *
@@ -74,6 +75,8 @@ public interface SupportsStaging {
     @PublicEvolving
     enum StagingPurpose {
         CREATE_TABLE_AS,
-        CREATE_TABLE_AS_IF_NOT_EXISTS
+        CREATE_TABLE_AS_IF_NOT_EXISTS,
+        REPLACE_TABLE_AS,
+        CREATE_OR_REPLACE_TABLE_AS
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java
new file mode 100644
index 00000000000..efe93520a30
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/AtomicRtasITCase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.AtomicRtasITCaseBase;
+import org.apache.flink.table.planner.utils.TestingTableEnvironment;
+
+/** Tests atomic rtas in batch mode. */
+public class AtomicRtasITCase extends AtomicRtasITCaseBase {
+
+    @Override
+    protected TableEnvironment getTableEnvironment() {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        return TestingTableEnvironment.create(settings, null, TableConfig.getDefault());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java
new file mode 100644
index 00000000000..d86b9b807b6
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/AtomicRtasITCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.runtime.utils.AtomicRtasITCaseBase;
+
+/** Tests atomic rtas in stream mode. */
+public class AtomicRtasITCase extends AtomicRtasITCaseBase {
+
+    @Override
+    protected TableEnvironment getTableEnvironment() {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+        return StreamTableEnvironment.create(
+                StreamExecutionEnvironment.getExecutionEnvironment(), settings);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
index ee8deaac591..341cc02d6c8 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicCtasITCaseBase.java
@@ -27,6 +27,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -57,6 +58,10 @@ public abstract class AtomicCtasITCaseBase extends TestLogger {
 
         String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
         tEnv.executeSql(sourceDDL);
+    }
+
+    @AfterEach
+    void clean() {
         // clean data
         TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
         TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
@@ -74,7 +79,7 @@ public abstract class AtomicCtasITCaseBase extends TestLogger {
 
     private void commonTestForAtomicCtas(String tableName, boolean ifNotExists, File tmpDataFolder)
             throws Exception {
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
         String dataDir = tmpDataFolder.getAbsolutePath();
         String sqlFragment = ifNotExists ? " if not exists " + tableName : tableName;
         tEnv.executeSql(
@@ -101,7 +106,7 @@ public abstract class AtomicCtasITCaseBase extends TestLogger {
 
     @Test
     void testAtomicCtasWithException(@TempDir Path temporaryFolder) throws Exception {
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, true);
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
         String dataDir = temporaryFolder.toFile().getAbsolutePath();
         assertThatCode(
                         () ->
@@ -121,7 +126,7 @@ public abstract class AtomicCtasITCaseBase extends TestLogger {
 
     @Test
     void testWithoutAtomicCtas(@TempDir Path temporaryFolder) throws Exception {
-        tEnv.getConfig().set(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED, false);
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, false);
         String dataDir = temporaryFolder.toFile().getAbsolutePath();
         tEnv.executeSql(
                         "create table atomic_ctas_table with ('connector' = 'test-staging', 'data-dir' = '"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java
new file mode 100644
index 00000000000..13b0e69125b
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/AtomicRtasITCaseBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.utils;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
+import org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory;
+import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The base case of atomic rtas ITCase. */
+public abstract class AtomicRtasITCaseBase extends TestLogger {
+
+    protected TableEnvironment tEnv;
+
+    protected abstract TableEnvironment getTableEnvironment();
+
+    @BeforeEach
+    void setup() {
+        tEnv = getTableEnvironment();
+        List<Row> sourceData = Collections.singletonList(Row.of(1, "ZM"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
+        tEnv.executeSql(sourceDDL);
+    }
+
+    @AfterEach
+    void clean() {
+        // clean data
+        TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
+        TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
+    }
+
+    @Test
+    void testAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_replace_table", false, true, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicReplaceTableAsWithReplacedTableNotExists(@TempDir Path temporaryFolder)
+            throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_replace_table_not_exists", false, false, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_create_or_replace_table", true, true, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAsWithReplacedTableNotExists(@TempDir Path temporaryFolder)
+            throws Exception {
+        commonTestForAtomicReplaceTableAs(
+                "atomic_create_or_replace_table_not_exists", true, false, temporaryFolder.toFile());
+    }
+
+    private void commonTestForAtomicReplaceTableAs(
+            String tableName,
+            boolean isCreateOrReplace,
+            boolean isCreateReplacedTable,
+            File tmpDataFolder)
+            throws Exception {
+        if (isCreateReplacedTable) {
+            tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
+        }
+
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
+        String sql =
+                sqlFragment
+                        + " with ('connector' = 'test-staging', 'data-dir' = '"
+                        + dataDir
+                        + "') as select * from t1";
+        if (!isCreateOrReplace && !isCreateReplacedTable) {
+            assertThatThrownBy(() -> tEnv.executeSql(sql))
+                    .isInstanceOf(TableException.class)
+                    .hasMessage(
+                            "The table `default_catalog`.`default_database`.`"
+                                    + tableName
+                                    + "` to be replaced doesn't exist."
+                                    + " You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement.");
+        } else {
+            tEnv.executeSql(sql).await();
+            if (isCreateReplacedTable) {
+                assertThat(tEnv.listTables()).contains(tableName);
+            } else {
+                assertThat(tEnv.listTables()).doesNotContain(tableName);
+            }
+            verifyDataFile(dataDir, "data");
+            assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+            assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                    .contains("begin", "commit");
+            assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(1);
+            if (isCreateOrReplace) {
+                assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                        .contains(SupportsStaging.StagingPurpose.CREATE_OR_REPLACE_TABLE_AS);
+            } else {
+                assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST)
+                        .contains(SupportsStaging.StagingPurpose.REPLACE_TABLE_AS);
+            }
+        }
+    }
+
+    @Test
+    void testAtomicReplaceTableAsWithException(@TempDir Path temporaryFolder) {
+        commonTestForAtomicReplaceTableAsWithException(
+                "atomic_replace_table_fail", false, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testAtomicCreateOrReplaceTableAsWithException(@TempDir Path temporaryFolder) {
+        commonTestForAtomicReplaceTableAsWithException(
+                "atomic_create_or_replace_table_fail", true, temporaryFolder.toFile());
+    }
+
+    private void commonTestForAtomicReplaceTableAsWithException(
+            String tableName, boolean isCreateOrReplace, File tmpDataFolder) {
+        tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, true);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
+        assertThatCode(
+                        () ->
+                                tEnv.executeSql(
+                                                sqlFragment
+                                                        + " with ('connector' = 'test-staging', 'data-dir' = '"
+                                                        + dataDir
+                                                        + "', 'sink-fail' = '"
+                                                        + true
+                                                        + "') as select * from t1")
+                                        .await())
+                .hasRootCauseMessage("Test StagedTable abort method.");
+
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(2);
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS)
+                .contains("begin", "abort");
+    }
+
+    @Test
+    void testWithoutAtomicReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
+        commonTestForWithoutAtomicReplaceTableAs(
+                "non_atomic_replace_table", false, temporaryFolder.toFile());
+    }
+
+    @Test
+    void testWithoutAtomicCreateOrReplaceTableAs(@TempDir Path temporaryFolder) throws Exception {
+        commonTestForWithoutAtomicReplaceTableAs(
+                "non_atomic_create_or_replace_table", true, temporaryFolder.toFile());
+    }
+
+    private void commonTestForWithoutAtomicReplaceTableAs(
+            String tableName, boolean isCreateOrReplace, File tmpDataFolder) throws Exception {
+        tEnv.executeSql("create table " + tableName + " (a int) with ('connector' = 'PRINT')");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED, false);
+        String dataDir = tmpDataFolder.getAbsolutePath();
+        String sqlFragment = getCreateOrReplaceSqlFragment(isCreateOrReplace, tableName);
+
+        tEnv.executeSql(
+                        sqlFragment
+                                + " with ('connector' = 'test-staging', 'data-dir' = '"
+                                + dataDir
+                                + "') as select * from t1")
+                .await();
+        assertThat(tEnv.listTables()).contains(tableName);
+        // Not using StagedTable, so need to read the hidden file
+        verifyDataFile(dataDir, "_data");
+        assertThat(TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS).hasSize(0);
+        assertThat(TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST).hasSize(0);
+    }
+
+    private void verifyDataFile(String dataDir, String fileName) throws IOException {
+        File dataFile = new File(dataDir, fileName);
+        assertThat(dataFile).exists();
+        assertThat(dataFile).isFile();
+        assertThat(FileUtils.readFileUtf8(dataFile)).isEqualTo("1,ZM");
+    }
+
+    private String getCreateOrReplaceSqlFragment(boolean isCreateOrReplace, String tableName) {
+        return isCreateOrReplace
+                ? " create or replace table " + tableName
+                : " replace table " + tableName;
+    }
+}