You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/30 08:27:44 UTC

[flink] branch FLINK-30665 created (now 5afc301bc64)

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a change to branch FLINK-30665
in repository https://gitbox.apache.org/repos/asf/flink.git


      at 5afc301bc64 [FLINK-30665][table] Planner supports row-level update

This branch includes the following new commits:

     new 5afc301bc64 [FLINK-30665][table] Planner supports row-level update

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 01/01: [FLINK-30665][table] Planner supports row-level update

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch FLINK-30665
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5afc301bc6445bd5bbd9cb521dc20a808a67694e
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sun Jan 15 17:48:13 2023 +0800

    [FLINK-30665][table] Planner supports row-level update
    
    This closes #21698
---
 .../table/planner/connectors/DynamicSinkUtils.java | 175 +++-
 .../operations/SqlToOperationConverter.java        |  25 +-
 .../plan/abilities/sink/RowLevelDeleteSpec.java    |   1 +
 ...evelDeleteSpec.java => RowLevelUpdateSpec.java} |  55 +-
 .../plan/abilities/sink/SinkAbilitySpec.java       |   3 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  10 +-
 .../factories/TestUpdateDeleteTableFactory.java    | 398 +++++++--
 ...st.java => SqlDdlToOperationConverterTest.java} | 701 +--------------
 .../operations/SqlDmlToOperationConverterTest.java | 334 +++++++
 .../operations/SqlOtherOperationConverterTest.java | 324 +++++++
 .../SqlToOperationConverterTestBase.java           | 148 ++++
 .../planner/plan/batch/sql/RowLevelUpdateTest.java | 165 ++++
 .../runtime/batch/sql/UpdateTableITCase.java       | 150 ++++
 .../runtime/stream/sql/UpdateTableITCase.java      |  44 +
 .../planner/plan/batch/sql/RowLevelUpdateTest.xml  | 973 +++++++++++++++++++++
 15 files changed, 2748 insertions(+), 758 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 69574fabd74..441ddcca2b1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -43,6 +43,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
@@ -54,6 +55,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
@@ -86,6 +88,7 @@ import org.apache.calcite.rex.RexUtil;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -216,9 +219,11 @@ public final class DynamicSinkUtils {
         List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>();
 
         boolean isDelete = false;
+        boolean isUpdate = false;
         if (input instanceof LogicalTableModify) {
             LogicalTableModify tableModify = (LogicalTableModify) input;
             isDelete = tableModify.getOperation() == TableModify.Operation.DELETE;
+            isUpdate = tableModify.getOperation() == TableModify.Operation.UPDATE;
         }
 
         // 1. prepare table sink
@@ -241,14 +246,24 @@ public final class DynamicSinkUtils {
                             dataTypeFactory,
                             typeFactory,
                             sinkAbilitySpecs);
+        } else if (isUpdate) {
+            input =
+                    convertUpdate(
+                            (LogicalTableModify) input,
+                            sink,
+                            contextResolvedTable,
+                            tableDebugName,
+                            dataTypeFactory,
+                            typeFactory,
+                            sinkAbilitySpecs);
         }
 
         sinkAbilitySpecs.forEach(spec -> spec.apply(sink));
 
         // 2. validate the query schema to the sink's table schema and apply cast if possible
         RelNode query = input;
-        // skip validate and implicit cast when it's delete as it been done before
-        if (!isDelete) {
+        // skip validate and implicit cast when it's delete/update since it has been done before
+        if (!isDelete && !isUpdate) {
             query =
                     validateSchemaAndApplyImplicitCast(
                             input, schema, tableDebugName, dataTypeFactory, typeFactory);
@@ -403,6 +418,57 @@ public final class DynamicSinkUtils {
         }
     }
 
+    private static RelNode convertUpdate(
+            LogicalTableModify tableModify,
+            DynamicTableSink sink,
+            ContextResolvedTable contextResolvedTable,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory,
+            List<SinkAbilitySpec> sinkAbilitySpecs) {
+        if (!(sink instanceof SupportsRowLevelUpdate)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Can't perform update operation of the table %s because the corresponding dynamic table sink has not yet implemented %s.",
+                            tableDebugName, SupportsRowLevelUpdate.class.getName()));
+        }
+        SupportsRowLevelUpdate supportsRowLevelUpdate = (SupportsRowLevelUpdate) sink;
+        ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema();
+        List<Column> updatedColumns = getUpdatedColumns(tableModify, resolvedSchema);
+        RowLevelModificationScanContext context = RowLevelModificationContextUtils.getScanContext();
+        SupportsRowLevelUpdate.RowLevelUpdateInfo updateInfo =
+                supportsRowLevelUpdate.applyRowLevelUpdate(updatedColumns, context);
+        if (updateInfo.getRowLevelUpdateMode()
+                        != SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS
+                && updateInfo.getRowLevelUpdateMode()
+                        != SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS) {
+            throw new IllegalArgumentException(
+                    "Unknown update mode:" + updateInfo.getRowLevelUpdateMode());
+        }
+        sinkAbilitySpecs.add(
+                new RowLevelUpdateSpec(
+                        updatedColumns, updateInfo.getRowLevelUpdateMode(), context));
+        return convertToRowLevelUpdate(
+                tableModify,
+                contextResolvedTable,
+                updateInfo,
+                tableDebugName,
+                dataTypeFactory,
+                typeFactory);
+    }
+
+    private static List<Column> getUpdatedColumns(
+            LogicalTableModify tableModify, ResolvedSchema resolvedSchema) {
+        List<Column> updatedColumns = new ArrayList<>();
+        List<String> updatedColumnNames = tableModify.getUpdateColumnList();
+        for (Column column : resolvedSchema.getColumns()) {
+            if (updatedColumnNames.contains(column.getName())) {
+                updatedColumns.add(column);
+            }
+        }
+        return updatedColumns;
+    }
+
     /** Convert tableModify node to a rel node representing for row-level delete. */
     private static RelNode convertToRowLevelDelete(
             LogicalTableModify tableModify,
@@ -540,6 +606,111 @@ public final class DynamicSinkUtils {
         return (LogicalTableScan) relNode;
     }
 
+    /** Convert tableModify node to a RelNode representing for row-level update. */
+    private static RelNode convertToRowLevelUpdate(
+            LogicalTableModify tableModify,
+            ContextResolvedTable contextResolvedTable,
+            SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory) {
+        // get the required columns
+        ResolvedSchema resolvedSchema = contextResolvedTable.getResolvedSchema();
+        Optional<List<Column>> optionalColumns = rowLevelUpdateInfo.requiredColumns();
+        List<Column> requiredColumns = optionalColumns.orElse(resolvedSchema.getColumns());
+        // get the root table scan which we may need rewrite it
+        LogicalTableScan tableScan = getSourceTableScan(tableModify);
+        Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
+                getRequireColumnsIndexAndExtraMetaCols(tableScan, requiredColumns, resolvedSchema);
+        List<Integer> updatedIndexes = colsIndexAndExtraMetaCols.f0;
+        List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1;
+        // if meta columns size is greater than 0, we need to modify the underlying
+        // LogicalTableScan to make it can read meta column
+        int originColsCount = resolvedSchema.getColumnCount();
+        if (metadataColumns.size() > 0) {
+            resolvedSchema =
+                    addExtraMetaCols(
+                            tableModify, tableScan, tableDebugName, metadataColumns, typeFactory);
+        }
+
+        return projectColumnsForUpdate(
+                tableModify,
+                originColsCount,
+                resolvedSchema,
+                updatedIndexes,
+                rowLevelUpdateInfo.getRowLevelUpdateMode(),
+                tableDebugName,
+                dataTypeFactory,
+                typeFactory);
+    }
+
+    // create a project only select the required column or expression for update
+    private static RelNode projectColumnsForUpdate(
+            LogicalTableModify tableModify,
+            int originColsCount,
+            ResolvedSchema resolvedSchema,
+            List<Integer> updatedIndexes,
+            SupportsRowLevelUpdate.RowLevelUpdateMode updateMode,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory) {
+        RexBuilder rexBuilder = tableModify.getCluster().getRexBuilder();
+        // the updated columns, whose order is same to user's update clause
+        List<String> updatedColumnNames = tableModify.getUpdateColumnList();
+        List<RexNode> newRexNodeList = new ArrayList<>();
+        List<String> newFieldNames = new ArrayList<>();
+        List<DataType> updateTargetDataTypes = new ArrayList<>();
+        Project project = (Project) (tableModify.getInput());
+
+        LogicalFilter filter = null;
+        // if the update mode is all rows, we need to know the filter to rewrite
+        // the update expression to IF(filter, updated_expr, col_expr)
+        if (updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS
+                && project.getInput() instanceof LogicalFilter) {
+            filter = (LogicalFilter) project.getInput();
+        }
+
+        // the rex nodes for the project are like: index for all col, update expressions for the
+        // updated columns
+        List<RexNode> oldRexNodes = project.getProjects();
+        for (int index : updatedIndexes) {
+            String colName = resolvedSchema.getColumnNames().get(index);
+            // if the updated cols contain the col to be selected, the updated expression should
+            // be in the project node
+            if (updatedColumnNames.contains(colName)) {
+                // get the index of the updated column in all updated columns
+                int i = updatedColumnNames.indexOf(colName);
+                // get the update expression
+                RexNode rexNode = oldRexNodes.get(originColsCount + i);
+                if (filter != null) {
+                    rexNode =
+                            rexBuilder.makeCall(
+                                    FlinkSqlOperatorTable.IF,
+                                    Arrays.asList(
+                                            filter.getCondition(),
+                                            rexNode,
+                                            rexBuilder.makeInputRef(project.getInput(), index)));
+                }
+                newRexNodeList.add(rexNode);
+            } else {
+                newRexNodeList.add(rexBuilder.makeInputRef(project.getInput(), index));
+            }
+            newFieldNames.add(colName);
+            updateTargetDataTypes.add(resolvedSchema.getColumnDataTypes().get(index));
+        }
+
+        project =
+                project.copy(
+                        project.getTraitSet(),
+                        // if filter is not null, we need to remove the filter in the plan since we
+                        // have rewritten the expression to IF(filter, updated_expr, col_expr)
+                        filter != null ? filter.getInput() : project.getInput(),
+                        newRexNodeList,
+                        RexUtil.createStructType(typeFactory, newRexNodeList, newFieldNames, null));
+        return validateSchemaAndApplyImplicitCast(
+                project, updateTargetDataTypes, tableDebugName, dataTypeFactory, typeFactory);
+    }
+
     /**
      * Add extra meta columns for underlying table scan, return a new resolve schema after adding
      * extra meta columns.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index d07f45a26a6..be90e92557b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -212,6 +212,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlUpdate;
 import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -403,6 +404,8 @@ public class SqlToOperationConverter {
             return Optional.of(converter.convertStopJob((SqlStopJob) validated));
         } else if (validated instanceof SqlDelete) {
             return Optional.of(converter.convertDelete((SqlDelete) validated));
+        } else if (validated instanceof SqlUpdate) {
+            return Optional.of(converter.convertUpdate((SqlUpdate) validated));
         } else {
             return Optional.empty();
         }
@@ -1512,8 +1515,8 @@ public class SqlToOperationConverter {
         // set it's delete
         RowLevelModificationContextUtils.setModificationType(
                 SupportsRowLevelModificationScan.RowLevelModificationType.DELETE);
-        RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
-        LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel;
+        RelRoot deleteRelational = flinkPlanner.rel(sqlDelete);
+        LogicalTableModify tableModify = (LogicalTableModify) deleteRelational.rel;
         UnresolvedIdentifier unresolvedTableIdentifier =
                 UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
         ContextResolvedTable contextResolvedTable =
@@ -1545,6 +1548,24 @@ public class SqlToOperationConverter {
                 contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.DELETE);
     }
 
+    private Operation convertUpdate(SqlUpdate sqlUpdate) {
+        // set it's update
+        RowLevelModificationContextUtils.setModificationType(
+                SupportsRowLevelModificationScan.RowLevelModificationType.UPDATE);
+        RelRoot updateRelational = flinkPlanner.rel(sqlUpdate);
+        // get target sink table
+        LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel;
+        UnresolvedIdentifier unresolvedTableIdentifier =
+                UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+        ContextResolvedTable contextResolvedTable =
+                catalogManager.getTableOrError(
+                        catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+        // get query
+        PlannerQueryOperation queryOperation = new PlannerQueryOperation(tableModify);
+        return new SinkModifyOperation(
+                contextResolvedTable, queryOperation, SinkModifyOperation.ModifyType.UPDATE);
+    }
+
     private void validateTableConstraint(SqlTableConstraint constraint) {
         if (constraint.isUnique()) {
             throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
index 04d9ca5bf12..1a7005f3b42 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
@@ -73,6 +73,7 @@ public class RowLevelDeleteSpec implements SinkAbilitySpec {
         }
     }
 
+    @Nonnull
     public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
         return rowLevelDeleteMode;
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
similarity index 60%
copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
index 04d9ca5bf12..d046ab002bb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
@@ -19,62 +19,70 @@
 package org.apache.flink.table.planner.plan.abilities.sink;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Objects;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * update mode & columns to/from JSON, but also can update existing data for {@link
+ * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
-@JsonTypeName("RowLevelDelete")
-public class RowLevelDeleteSpec implements SinkAbilitySpec {
-    public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = "rowLevelDeleteMode";
+@JsonTypeName("RowLevelUpdate")
+public class RowLevelUpdateSpec implements SinkAbilitySpec {
+    public static final String FIELD_NAME_UPDATED_COLUMNS = "updatedColumns";
+    public static final String FIELD_NAME_ROW_LEVEL_UPDATE_MODE = "rowLevelUpdateMode";
+
+    @JsonProperty(FIELD_NAME_UPDATED_COLUMNS)
+    @Nonnull
+    private final List<Column> updatedColumns;
 
-    @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE)
+    @JsonProperty(FIELD_NAME_ROW_LEVEL_UPDATE_MODE)
     @Nonnull
-    private final SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode;
+    private final SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode;
 
     @JsonIgnore @Nullable private final RowLevelModificationScanContext scanContext;
 
     @JsonCreator
-    public RowLevelDeleteSpec(
-            @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull
-                    SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode,
+    public RowLevelUpdateSpec(
+            @JsonProperty(FIELD_NAME_UPDATED_COLUMNS) @Nonnull List<Column> updatedColumns,
+            @JsonProperty(FIELD_NAME_ROW_LEVEL_UPDATE_MODE) @Nonnull
+                    SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode,
             @Nullable RowLevelModificationScanContext scanContext) {
-        this.rowLevelDeleteMode = Preconditions.checkNotNull(rowLevelDeleteMode);
+        this.updatedColumns = updatedColumns;
+        this.rowLevelUpdateMode = rowLevelUpdateMode;
         this.scanContext = scanContext;
     }
 
     @Override
     public void apply(DynamicTableSink tableSink) {
-        if (tableSink instanceof SupportsRowLevelDelete) {
-            ((SupportsRowLevelDelete) tableSink).applyRowLevelDelete(scanContext);
+        if (tableSink instanceof SupportsRowLevelUpdate) {
+            ((SupportsRowLevelUpdate) tableSink).applyRowLevelUpdate(updatedColumns, scanContext);
         } else {
             throw new TableException(
                     String.format(
-                            "%s does not support SupportsRowLevelDelete.",
+                            "%s does not support SupportsRowLevelUpdate.",
                             tableSink.getClass().getName()));
         }
     }
 
-    public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
-        return rowLevelDeleteMode;
+    @Nonnull
+    public SupportsRowLevelUpdate.RowLevelUpdateMode getRowLevelUpdateMode() {
+        return rowLevelUpdateMode;
     }
 
     @Override
@@ -85,13 +93,14 @@ public class RowLevelDeleteSpec implements SinkAbilitySpec {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        RowLevelDeleteSpec that = (RowLevelDeleteSpec) o;
-        return rowLevelDeleteMode == that.rowLevelDeleteMode
+        RowLevelUpdateSpec that = (RowLevelUpdateSpec) o;
+        return Objects.equals(updatedColumns, that.updatedColumns)
+                && rowLevelUpdateMode == that.rowLevelUpdateMode
                 && Objects.equals(scanContext, that.scanContext);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(rowLevelDeleteMode, scanContext);
+        return Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
index fcfb6078433..ce1a494c493 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
@@ -35,7 +35,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
     @JsonSubTypes.Type(value = OverwriteSpec.class),
     @JsonSubTypes.Type(value = PartitioningSpec.class),
     @JsonSubTypes.Type(value = WritingMetadataSpec.class),
-    @JsonSubTypes.Type(value = RowLevelDeleteSpec.class)
+    @JsonSubTypes.Type(value = RowLevelDeleteSpec.class),
+    @JsonSubTypes.Type(value = RowLevelUpdateSpec.class)
 })
 @Internal
 public interface SinkAbilitySpec {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 7517e8501eb..852712a4395 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -48,10 +48,12 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
 import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
@@ -653,7 +655,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
     /**
      * Get the target row-kind that the row data should change to, assuming the current row kind is
      * RowKind.INSERT. Return Optional.empty() if it doesn't need to change. Currently, it'll only
-     * consider row-level delete.
+     * consider row-level delete/update.
      */
     private Optional<RowKind> getTargetRowKind() {
         if (tableSinkSpec.getSinkAbilities() != null) {
@@ -664,6 +666,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
                             == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
                         return Optional.of(RowKind.DELETE);
                     }
+                } else if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec updateSpec = (RowLevelUpdateSpec) sinkAbilitySpec;
+                    if (updateSpec.getRowLevelUpdateMode()
+                            == SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS) {
+                        return Optional.of(RowKind.UPDATE_AFTER);
+                    }
                 }
             }
         }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
index b7c0ce53d11..fa38811ed08 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -40,6 +41,7 @@ import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
@@ -115,13 +117,33 @@ public class TestUpdateDeleteTableFactory
                     .defaultValue(SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS)
                     .withDescription("The delete mode for row level delete.");
 
+    private static final ConfigOption<SupportsRowLevelUpdate.RowLevelUpdateMode> UPDATE_MODE =
+            ConfigOptions.key("update-mode")
+                    .enumType(SupportsRowLevelUpdate.RowLevelUpdateMode.class)
+                    .defaultValue(SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS)
+                    .withDescription("The update mode for row level update.");
+
     private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_DELETE =
             ConfigOptions.key("required-columns-for-delete")
                     .stringType()
                     .asList()
                     .noDefaultValue()
                     .withDescription(
-                            "The columns' name for the required columns in row-level delete");
+                            "The columns' name for the required columns in row-level delete.");
+
+    private static final ConfigOption<List<String>> REQUIRED_COLUMNS_FOR_UPDATE =
+            ConfigOptions.key("required-columns-for-update")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription("The name for the required columns in row-level update.");
+
+    private static final ConfigOption<Boolean> ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE =
+            ConfigOptions.key("only-require-updated-columns-for-update")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to only require the updated columns for update statement, require all columns by default.");
 
     private static final List<Column.MetadataColumn> META_COLUMNS =
             Arrays.asList(
@@ -145,27 +167,41 @@ public class TestUpdateDeleteTableFactory
         String dataId =
                 helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
         SupportsRowLevelDelete.RowLevelDeleteMode deleteMode = helper.getOptions().get(DELETE_MODE);
-        List<String> requireCols = helper.getOptions().get(REQUIRED_COLUMNS_FOR_DELETE);
+        SupportsRowLevelUpdate.RowLevelUpdateMode updateMode = helper.getOptions().get(UPDATE_MODE);
+        List<String> requireColsForDelete = helper.getOptions().get(REQUIRED_COLUMNS_FOR_DELETE);
+        List<String> requireColsForUpdate = helper.getOptions().get(REQUIRED_COLUMNS_FOR_UPDATE);
+        boolean onlyRequireUpdatedColumns =
+                helper.getOptions().get(ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE);
         if (helper.getOptions().get(MIX_DELETE)) {
             return new SupportsDeleteSink(
                     context.getObjectIdentifier(),
                     context.getCatalogTable(),
                     deleteMode,
+                    updateMode,
                     dataId,
-                    requireCols);
+                    requireColsForDelete,
+                    requireColsForUpdate,
+                    onlyRequireUpdatedColumns);
         } else {
             if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) {
                 return new SupportsDeletePushDownSink(
+                        context.getObjectIdentifier(),
+                        context.getCatalogTable(),
+                        updateMode,
                         dataId,
-                        helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE),
-                        context.getCatalogTable());
+                        requireColsForUpdate,
+                        onlyRequireUpdatedColumns,
+                        helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE));
             } else {
-                return new SupportsRowLevelDeleteSink(
+                return new SupportsRowLevelModificationSink(
                         context.getObjectIdentifier(),
                         context.getCatalogTable(),
                         deleteMode,
+                        updateMode,
                         dataId,
-                        requireCols);
+                        requireColsForDelete,
+                        requireColsForUpdate,
+                        onlyRequireUpdatedColumns);
             }
         }
     }
@@ -199,7 +235,10 @@ public class TestUpdateDeleteTableFactory
                         SUPPORT_DELETE_PUSH_DOWN,
                         MIX_DELETE,
                         DELETE_MODE,
-                        REQUIRED_COLUMNS_FOR_DELETE));
+                        UPDATE_MODE,
+                        REQUIRED_COLUMNS_FOR_DELETE,
+                        REQUIRED_COLUMNS_FOR_UPDATE,
+                        ONLY_REQUIRE_UPDATED_COLUMNS_FOR_UPDATE));
     }
 
     /** A test table source which supports reading metadata. */
@@ -220,7 +259,7 @@ public class TestUpdateDeleteTableFactory
 
         @Override
         public String asSummaryString() {
-            return "test table source";
+            return "TestTableSource";
         }
 
         @Override
@@ -280,32 +319,128 @@ public class TestUpdateDeleteTableFactory
         private final Set<ObjectIdentifier> scanTables = new HashSet<>();
     }
 
-    /** A common test sink. */
-    private static class TestSink implements DynamicTableSink {
+    /** A sink that supports row-level update. */
+    private static class SupportsRowLevelUpdateSink
+            implements DynamicTableSink, SupportsRowLevelUpdate {
+
+        protected final ObjectIdentifier tableIdentifier;
+        protected final ResolvedCatalogTable resolvedCatalogTable;
+        protected final RowLevelUpdateMode updateMode;
+        protected final List<String> requireColumnsForUpdate;
+        protected final boolean onlyRequireUpdatedColumns;
+        protected final String dataId;
+
+        protected boolean isUpdate;
+
+        public SupportsRowLevelUpdateSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                RowLevelUpdateMode updateMode,
+                String dataId,
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns) {
+            this(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    updateMode,
+                    dataId,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    false);
+        }
+
+        public SupportsRowLevelUpdateSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                RowLevelUpdateMode updateMode,
+                String dataId,
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns,
+                boolean isUpdate) {
+            this.tableIdentifier = tableIdentifier;
+            this.resolvedCatalogTable = resolvedCatalogTable;
+            this.updateMode = updateMode;
+            this.dataId = dataId;
+            this.requireColumnsForUpdate = requireColumnsForUpdate;
+            this.onlyRequireUpdatedColumns = onlyRequireUpdatedColumns;
+            this.isUpdate = isUpdate;
+        }
 
         @Override
         public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-            return ChangelogMode.insertOnly();
+            return ChangelogMode.upsert();
         }
 
         @Override
         public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-            return null;
+            return new DataStreamSinkProvider() {
+
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> dataStream) {
+                    return dataStream
+                            .addSink(
+                                    new UpdateDataSinkFunction(
+                                            dataId,
+                                            getPrimaryKeyFieldGetter(
+                                                    resolvedCatalogTable.getResolvedSchema()),
+                                            getAllFieldGetter(
+                                                    resolvedCatalogTable.getResolvedSchema()),
+                                            updateMode))
+                            .setParallelism(1);
+                }
+            };
         }
 
         @Override
         public DynamicTableSink copy() {
-            return new TestSink();
+            return new SupportsRowLevelUpdateSink(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    updateMode,
+                    dataId,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    isUpdate);
         }
 
         @Override
         public String asSummaryString() {
-            return "Test Sink";
+            return "SupportsRowLevelUpdateSink";
+        }
+
+        @Override
+        public RowLevelUpdateInfo applyRowLevelUpdate(
+                List<Column> updatedColumns, @Nullable RowLevelModificationScanContext context) {
+            checkScanContext(context, tableIdentifier);
+            this.isUpdate = true;
+
+            return new RowLevelUpdateInfo() {
+
+                @Override
+                public Optional<List<Column>> requiredColumns() {
+                    List<Column> requiredCols = null;
+                    if (onlyRequireUpdatedColumns) {
+                        requiredCols = updatedColumns;
+                    } else if (requireColumnsForUpdate != null) {
+                        requiredCols =
+                                getRequiredColumns(
+                                        requireColumnsForUpdate,
+                                        resolvedCatalogTable.getResolvedSchema());
+                    }
+                    return Optional.ofNullable(requiredCols);
+                }
+
+                @Override
+                public RowLevelUpdateMode getRowLevelUpdateMode() {
+                    return updateMode;
+                }
+            };
         }
     }
 
-    /** A sink that supports delete push down. */
-    private static class SupportsRowLevelDeleteSink extends TestSink
+    /** A sink that supports row-level delete/update. */
+    private static class SupportsRowLevelModificationSink extends SupportsRowLevelUpdateSink
             implements SupportsRowLevelDelete {
 
         private final ObjectIdentifier tableIdentifier;
@@ -316,28 +451,47 @@ public class TestUpdateDeleteTableFactory
 
         private boolean isDelete;
 
-        public SupportsRowLevelDeleteSink(
+        public SupportsRowLevelModificationSink(
                 ObjectIdentifier tableIdentifier,
                 ResolvedCatalogTable resolvedCatalogTable,
                 RowLevelDeleteMode deleteMode,
+                RowLevelUpdateMode updateMode,
                 String dataId,
-                List<String> requireColumnsForDelete) {
+                List<String> requireColumnsForDelete,
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns) {
             this(
                     tableIdentifier,
                     resolvedCatalogTable,
                     deleteMode,
+                    updateMode,
                     dataId,
                     requireColumnsForDelete,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    false,
                     false);
         }
 
-        public SupportsRowLevelDeleteSink(
+        public SupportsRowLevelModificationSink(
                 ObjectIdentifier tableIdentifier,
                 ResolvedCatalogTable resolvedCatalogTable,
                 RowLevelDeleteMode deleteMode,
+                RowLevelUpdateMode updateMode,
                 String dataId,
                 List<String> requireColumnsForDelete,
-                boolean isDelete) {
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns,
+                boolean isDelete,
+                boolean isUpdate) {
+            super(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    updateMode,
+                    dataId,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    isUpdate);
             this.tableIdentifier = tableIdentifier;
             this.resolvedCatalogTable = resolvedCatalogTable;
             this.deleteMode = deleteMode;
@@ -353,50 +507,56 @@ public class TestUpdateDeleteTableFactory
 
         @Override
         public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-            return new DataStreamSinkProvider() {
-                @Override
-                public DataStreamSink<?> consumeDataStream(
-                        ProviderContext providerContext, DataStream<RowData> dataStream) {
-                    return dataStream
-                            .addSink(
-                                    new DeleteDataSinkFunction(
-                                            dataId,
-                                            getAllFieldGetter(
-                                                    resolvedCatalogTable.getResolvedSchema()),
-                                            deleteMode))
-                            .setParallelism(1);
-                }
-            };
+            if (isUpdate) {
+                return super.getSinkRuntimeProvider(context);
+            } else {
+                return new DataStreamSinkProvider() {
+                    @Override
+                    public DataStreamSink<?> consumeDataStream(
+                            ProviderContext providerContext, DataStream<RowData> dataStream) {
+                        if (isDelete) {
+                            return dataStream
+                                    .addSink(
+                                            new DeleteDataSinkFunction(
+                                                    dataId,
+                                                    getAllFieldGetter(
+                                                            resolvedCatalogTable
+                                                                    .getResolvedSchema()),
+                                                    deleteMode))
+                                    .setParallelism(1);
+                        } else {
+                            // otherwise, do nothing
+                            return dataStream.addSink(new DiscardingSink<>());
+                        }
+                    }
+                };
+            }
         }
 
         @Override
         public DynamicTableSink copy() {
-            return new SupportsRowLevelDeleteSink(
+            return new SupportsRowLevelModificationSink(
                     tableIdentifier,
                     resolvedCatalogTable,
                     deleteMode,
+                    updateMode,
                     dataId,
                     requireColumnsForDelete,
-                    isDelete);
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    isDelete,
+                    isUpdate);
         }
 
         @Override
         public String asSummaryString() {
-            return "support row-level delete sink";
+            return "SupportsRowLevelModificationSink";
         }
 
         @Override
         public RowLevelDeleteInfo applyRowLevelDelete(
                 @Nullable RowLevelModificationScanContext context) {
-            // the context should contain the object identifier of the table to be written
-            Preconditions.checkArgument(context instanceof TestScanContext);
-            TestScanContext scanContext = (TestScanContext) context;
-            Preconditions.checkArgument(
-                    scanContext.scanTables.contains(tableIdentifier),
-                    String.format(
-                            "The scan context should contains the object identifier for table %s in row-level delete.",
-                            tableIdentifier));
-
+            checkScanContext(context, tableIdentifier);
             this.isDelete = true;
             return new RowLevelDeleteInfo() {
                 @Override
@@ -480,8 +640,8 @@ public class TestUpdateDeleteTableFactory
         }
     }
 
-    /** A sink that supports delete push down. */
-    public static class SupportsDeletePushDownSink extends TestSink
+    /** A sink that supports delete push down and row-level update. */
+    public static class SupportsDeletePushDownSink extends SupportsRowLevelUpdateSink
             implements SupportsDeletePushDown {
 
         private final String dataId;
@@ -493,9 +653,20 @@ public class TestUpdateDeleteTableFactory
         private List<Tuple2<String, Object>> equalPredicates;
 
         public SupportsDeletePushDownSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                RowLevelUpdateMode updateMode,
                 String dataId,
-                boolean onlyAcceptEqualPredicate,
-                ResolvedCatalogTable resolvedCatalogTable) {
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns,
+                boolean onlyAcceptEqualPredicate) {
+            super(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    updateMode,
+                    dataId,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns);
             this.dataId = dataId;
             this.onlyAcceptEqualPredicate = onlyAcceptEqualPredicate;
             this.resolvedCatalogTable = resolvedCatalogTable;
@@ -506,7 +677,13 @@ public class TestUpdateDeleteTableFactory
         @Override
         public DynamicTableSink copy() {
             return new SupportsDeletePushDownSink(
-                    dataId, onlyAcceptEqualPredicate, resolvedCatalogTable);
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    updateMode,
+                    dataId,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns,
+                    onlyAcceptEqualPredicate);
         }
 
         @Override
@@ -594,22 +771,28 @@ public class TestUpdateDeleteTableFactory
         return true;
     }
 
-    /** A sink that supports both delete push down and row-level delete. */
-    private static class SupportsDeleteSink extends SupportsRowLevelDeleteSink
+    /** A sink that supports both delete push down and row-level delete/update. */
+    private static class SupportsDeleteSink extends SupportsRowLevelModificationSink
             implements SupportsDeletePushDown {
 
         public SupportsDeleteSink(
                 ObjectIdentifier tableIdentifier,
                 ResolvedCatalogTable resolvedCatalogTable,
                 SupportsRowLevelDelete.RowLevelDeleteMode deleteMode,
+                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode,
                 String dataId,
-                List<String> requireColumnsForDelete) {
+                List<String> requireColumnsForDelete,
+                List<String> requireColumnsForUpdate,
+                boolean onlyRequireUpdatedColumns) {
             super(
                     tableIdentifier,
                     resolvedCatalogTable,
                     deleteMode,
+                    updateMode,
                     dataId,
-                    requireColumnsForDelete);
+                    requireColumnsForDelete,
+                    requireColumnsForUpdate,
+                    onlyRequireUpdatedColumns);
         }
 
         @Override
@@ -629,6 +812,109 @@ public class TestUpdateDeleteTableFactory
         }
     }
 
+    /** The sink for update existing data. */
+    private static class UpdateDataSinkFunction extends RichSinkFunction<RowData> {
+        private final String dataId;
+        private final RowData.FieldGetter[] primaryKeyFieldGetters;
+        private final RowData.FieldGetter[] allFieldGetters;
+        private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+        private transient RowData[] oldRows;
+        private transient List<Tuple2<Integer, RowData>> updatedRows;
+        private transient List<RowData> allNewRows;
+
+        public UpdateDataSinkFunction(
+                String dataId,
+                RowData.FieldGetter[] primaryKeyFieldGetters,
+                RowData.FieldGetter[] allFieldGetters,
+                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+            this.dataId = dataId;
+            this.primaryKeyFieldGetters = primaryKeyFieldGetters;
+            this.updateMode = updateMode;
+            this.allFieldGetters = allFieldGetters;
+        }
+
+        @Override
+        public void open(Configuration parameters) {
+            oldRows = registeredRowData.get(dataId).toArray(new RowData[0]);
+            updatedRows = new ArrayList<>();
+            allNewRows = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(RowData value, Context context) {
+            if (updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS) {
+                consumeUpdatedRows(value);
+            } else if (updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS) {
+                consumeAllRows(value);
+            } else {
+                throw new TableException("Unknown update mode " + updateMode);
+            }
+        }
+
+        private void consumeUpdatedRows(RowData updatedRow) {
+            Preconditions.checkArgument(
+                    updatedRow.getRowKind() == RowKind.UPDATE_AFTER,
+                    "The RowKind for the updated rows should be " + RowKind.UPDATE_AFTER);
+
+            for (int i = 0; i < oldRows.length; i++) {
+                if (equal(oldRows[i], updatedRow, primaryKeyFieldGetters)) {
+                    updatedRows.add(new Tuple2<>(i, copyRowData(updatedRow, allFieldGetters)));
+                }
+            }
+        }
+
+        private void consumeAllRows(RowData rowData) {
+            Preconditions.checkArgument(
+                    rowData.getRowKind() == RowKind.INSERT,
+                    "The RowKind for the updated rows should be " + RowKind.INSERT);
+            allNewRows.add(copyRowData(rowData, allFieldGetters));
+        }
+
+        @Override
+        public void finish() throws Exception {
+            if (updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS) {
+                commitForUpdatedRows();
+            } else if (updateMode == SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS) {
+                commitForAllRows();
+            } else {
+                throw new TableException("Unknown update mode " + updateMode);
+            }
+        }
+
+        private void commitForUpdatedRows() {
+            List<RowData> newRows = Arrays.asList(oldRows);
+            for (Tuple2<Integer, RowData> updatedRow : updatedRows) {
+                newRows.set(updatedRow.f0, updatedRow.f1);
+            }
+            registeredRowData.put(dataId, newRows);
+        }
+
+        private void commitForAllRows() {
+            registeredRowData.put(dataId, allNewRows);
+        }
+    }
+
+    private static void checkScanContext(
+            RowLevelModificationScanContext context, ObjectIdentifier tableIdentifier) {
+        // the context should contain the object identifier of the table to be written
+        Preconditions.checkArgument(context instanceof TestScanContext);
+        TestScanContext scanContext = (TestScanContext) context;
+        Preconditions.checkArgument(
+                scanContext.scanTables.contains(tableIdentifier),
+                "The scan context should contains the object identifier for row-level modification.");
+    }
+
+    private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(ResolvedSchema resolvedSchema) {
+        int[] indexes = resolvedSchema.getPrimaryKeyIndexes();
+        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[indexes.length];
+        List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
+        for (int i = 0; i < fieldGetters.length; i++) {
+            int colIndex = indexes[i];
+            fieldGetters[i] = createFieldGetter(dataTypes.get(colIndex).getLogicalType(), colIndex);
+        }
+        return fieldGetters;
+    }
+
     private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
         List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
         RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()];
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
similarity index 79%
rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 86d09ee4be5..e0945d65267 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,11 @@
 
 package org.apache.flink.table.planner.operations;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.sql.parser.dql.SqlRichExplain;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableColumn.ComputedColumn;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
@@ -37,11 +30,8 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
-import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
-import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -49,39 +39,12 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.factories.TestManagedTableFactory;
-import org.apache.flink.table.operations.BeginStatementSetOperation;
-import org.apache.flink.table.operations.DeleteFromFilterOperation;
-import org.apache.flink.table.operations.EndStatementSetOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.LoadModuleOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation.FunctionScope;
-import org.apache.flink.table.operations.ShowModulesOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
 import org.apache.flink.table.operations.SinkModifyOperation;
 import org.apache.flink.table.operations.SourceQueryOperation;
-import org.apache.flink.table.operations.StatementSetOperation;
-import org.apache.flink.table.operations.UnloadModuleOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.UseModulesOperation;
-import org.apache.flink.table.operations.command.AddJarOperation;
-import org.apache.flink.table.operations.command.ClearOperation;
-import org.apache.flink.table.operations.command.HelpOperation;
-import org.apache.flink.table.operations.command.QuitOperation;
-import org.apache.flink.table.operations.command.RemoveJarOperation;
-import org.apache.flink.table.operations.command.ResetOperation;
-import org.apache.flink.table.operations.command.SetOperation;
-import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
@@ -92,46 +55,30 @@ import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
 import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
-import org.apache.flink.table.planner.delegation.ParserImpl;
-import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.expressions.utils.Func0$;
 import org.apache.flink.table.planner.expressions.utils.Func1$;
 import org.apache.flink.table.planner.expressions.utils.Func8$;
-import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
 import org.apache.flink.table.planner.parse.CalciteParser;
-import org.apache.flink.table.planner.parse.ExtendedParser;
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
-import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.apache.calcite.sql.SqlNode;
 import org.assertj.core.api.HamcrestCondition;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
 import static org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
@@ -140,104 +87,9 @@ import static org.apache.flink.table.planner.utils.OperationMatchers.withOptions
 import static org.apache.flink.table.planner.utils.OperationMatchers.withSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.InstanceOfAssertFactories.type;
-
-/** Test cases for {@link SqlToOperationConverter}. */
-public class SqlToOperationConverterTest {
-    private final boolean isStreamingMode = false;
-    private final TableConfig tableConfig = TableConfig.getDefault();
-    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
-    private final CatalogManager catalogManager =
-            CatalogManagerMocks.preparedCatalogManager()
-                    .defaultCatalog("builtin", catalog)
-                    .config(
-                            Configuration.fromMap(
-                                    Collections.singletonMap(
-                                            ExecutionOptions.RUNTIME_MODE.key(),
-                                            RuntimeExecutionMode.BATCH.name())))
-                    .build();
-
-    private final PlannerMocks plannerMocks =
-            PlannerMocks.newBuilder()
-                    .withBatchMode(true)
-                    .withTableConfig(tableConfig)
-                    .withCatalogManager(catalogManager)
-                    .withRootSchema(
-                            asRootSchema(
-                                    new CatalogManagerCalciteSchema(
-                                            catalogManager, isStreamingMode)))
-                    .build();
-    private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
-    private final FunctionCatalog functionCatalog = plannerMocks.getFunctionCatalog();
-
-    private final Supplier<FlinkPlannerImpl> plannerSupplier = plannerContext::createFlinkPlanner;
-
-    private final Parser parser =
-            new ParserImpl(
-                    catalogManager,
-                    plannerSupplier,
-                    () -> plannerSupplier.get().parser(),
-                    plannerContext.getRexFactory());
-
-    @BeforeEach
-    public void before() throws TableAlreadyExistException, DatabaseNotExistException {
-        catalogManager.initSchemaResolver(
-                isStreamingMode,
-                ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser));
-
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        final TableSchema tableSchema =
-                TableSchema.builder()
-                        .field("a", DataTypes.BIGINT())
-                        .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .field("c", DataTypes.INT())
-                        .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
-                        .build();
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", "COLLECTION");
-        final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, "");
-        catalog.createTable(path1, catalogTable, true);
-        catalog.createTable(path2, catalogTable, true);
-    }
-
-    @AfterEach
-    public void after() throws TableNotExistException {
-        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
-        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
-        catalog.dropTable(path1, true);
-        catalog.dropTable(path2, true);
-    }
-
-    @Test
-    public void testUseCatalog() {
-        final String sql = "USE CATALOG cat1";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(UseCatalogOperation.class);
-        assertThat(((UseCatalogOperation) operation).getCatalogName()).isEqualTo("cat1");
-        assertThat(operation.asSummaryString()).isEqualTo("USE CATALOG cat1");
-    }
 
-    @Test
-    public void testUseDatabase() {
-        final String sql1 = "USE db1";
-        Operation operation1 = parse(sql1);
-        assertThat(operation1).isInstanceOf(UseDatabaseOperation.class);
-        assertThat(((UseDatabaseOperation) operation1).getCatalogName()).isEqualTo("builtin");
-        assertThat(((UseDatabaseOperation) operation1).getDatabaseName()).isEqualTo("db1");
-
-        final String sql2 = "USE cat1.db1";
-        Operation operation2 = parse(sql2);
-        assertThat(operation2).isInstanceOf(UseDatabaseOperation.class);
-        assertThat(((UseDatabaseOperation) operation2).getCatalogName()).isEqualTo("cat1");
-        assertThat(((UseDatabaseOperation) operation2).getDatabaseName()).isEqualTo("db1");
-    }
-
-    @Test
-    public void testUseDatabaseWithException() {
-        final String sql = "USE cat1.db1.tbl1";
-        assertThatThrownBy(() -> parse(sql)).isInstanceOf(ValidationException.class);
-    }
+/** Test cases for the DDL statements for {@link SqlToOperationConverter}. */
+public class SqlDdlToOperationConverterTest extends SqlToOperationConverterTestBase {
 
     @Test
     public void testCreateDatabase() {
@@ -328,122 +180,6 @@ public class SqlToOperationConverterTest {
                 .isEqualTo(properties);
     }
 
-    @Test
-    public void testLoadModule() {
-        final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
-        final String expectedModuleName = "dummy";
-        final Map<String, String> expectedOptions = new HashMap<>();
-        expectedOptions.put("k1", "v1");
-        expectedOptions.put("k2", "v2");
-
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(LoadModuleOperation.class);
-        final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
-
-        assertThat(loadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
-        assertThat(loadModuleOperation.getOptions()).isEqualTo(expectedOptions);
-    }
-
-    @Test
-    public void testUnloadModule() {
-        final String sql = "UNLOAD MODULE dummy";
-        final String expectedModuleName = "dummy";
-
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(UnloadModuleOperation.class);
-
-        final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
-
-        assertThat(unloadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
-    }
-
-    @Test
-    public void testUseOneModule() {
-        final String sql = "USE MODULES dummy";
-        final List<String> expectedModuleNames = Collections.singletonList("dummy");
-
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(UseModulesOperation.class);
-
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
-        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE MODULES: [dummy]");
-    }
-
-    @Test
-    public void testUseMultipleModules() {
-        final String sql = "USE MODULES x, y, z";
-        final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
-
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(UseModulesOperation.class);
-
-        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
-
-        assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
-        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE MODULES: [x, y, z]");
-    }
-
-    @Test
-    public void testShowModules() {
-        final String sql = "SHOW MODULES";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertThat(showModulesOperation.requireFull()).isFalse();
-        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW MODULES");
-    }
-
-    @Test
-    public void testShowTables() {
-        final String sql = "SHOW TABLES from cat1.db1 not like 't%'";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(ShowTablesOperation.class);
-
-        ShowTablesOperation showTablesOperation = (ShowTablesOperation) operation;
-        assertThat(showTablesOperation.getCatalogName()).isEqualTo("cat1");
-        assertThat(showTablesOperation.getDatabaseName()).isEqualTo("db1");
-        assertThat(showTablesOperation.getPreposition()).isEqualTo("FROM");
-        assertThat(showTablesOperation.isUseLike()).isTrue();
-        assertThat(showTablesOperation.isNotLike()).isTrue();
-
-        final String sql2 = "SHOW TABLES in db2";
-        showTablesOperation = (ShowTablesOperation) parse(sql2);
-        assertThat(showTablesOperation.getCatalogName()).isEqualTo("builtin");
-        assertThat(showTablesOperation.getDatabaseName()).isEqualTo("db2");
-        assertThat(showTablesOperation.getPreposition()).isEqualTo("IN");
-        assertThat(showTablesOperation.isUseLike()).isFalse();
-        assertThat(showTablesOperation.isNotLike()).isFalse();
-
-        final String sql3 = "SHOW TABLES";
-        showTablesOperation = (ShowTablesOperation) parse(sql3);
-        assertThat(showTablesOperation.getCatalogName()).isNull();
-        assertThat(showTablesOperation.getDatabaseName()).isNull();
-        assertThat(showTablesOperation.getPreposition()).isNull();
-    }
-
-    @Test
-    public void testShowFullModules() {
-        final String sql = "SHOW FULL MODULES";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
-        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
-
-        assertThat(showModulesOperation.requireFull()).isTrue();
-        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW FULL MODULES");
-    }
-
-    @Test
-    public void testShowFunctions() {
-        final String sql1 = "SHOW FUNCTIONS";
-        assertShowFunctions(sql1, sql1, FunctionScope.ALL);
-
-        final String sql2 = "SHOW USER FUNCTIONS";
-        assertShowFunctions(sql2, sql2, FunctionScope.USER);
-    }
-
     @Test
     public void testCreateTable() {
         final String sql =
@@ -584,30 +320,6 @@ public class SqlToOperationConverterTest {
         assertThat(sortedProperties.toString()).isEqualTo(expected);
     }
 
-    @Test
-    public void testExplainWithSelect() {
-        final String sql = "explain select * from t1";
-        checkExplainSql(sql);
-    }
-
-    @Test
-    public void testExplainWithInsert() {
-        final String sql = "explain insert into t2 select * from t1";
-        checkExplainSql(sql);
-    }
-
-    @Test
-    public void testExplainWithUnion() {
-        final String sql = "explain select * from t1 union select * from t2";
-        checkExplainSql(sql);
-    }
-
-    @Test
-    public void testExplainWithExplainDetails() {
-        String sql = "explain changelog_mode, estimated_cost, json_execution_plan select * from t1";
-        checkExplainSql(sql);
-    }
-
     @Test
     public void testCreateTableWithWatermark()
             throws FunctionAlreadyExistException, DatabaseNotExistException {
@@ -893,46 +605,6 @@ public class SqlToOperationConverterTest {
                                 + " ROW<`tmstmp` TIMESTAMP(3)>.");
     }
 
-    @Test
-    public void testSqlInsertWithStaticPartition() {
-        final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
-        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
-        final Map<String, String> expectedStaticPartitions = new HashMap<>();
-        expectedStaticPartitions.put("a", "1");
-        assertThat(sinkModifyOperation.getStaticPartitions()).isEqualTo(expectedStaticPartitions);
-    }
-
-    @Test
-    public void testSqlInsertWithDynamicTableOptions() {
-        final String sql =
-                "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
-                        + "select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
-        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
-        Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
-        assertThat(dynamicOptions).isNotNull();
-        assertThat(dynamicOptions.size()).isEqualTo(2);
-        assertThat(dynamicOptions.toString()).isEqualTo("{k1=v1, k2=v2}");
-    }
-
-    @Test
-    public void testDynamicTableWithInvalidOptions() {
-        final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        assertThatThrownBy(() -> parse(sql, planner, parser))
-                .isInstanceOf(AssertionError.class)
-                .hasMessageContaining(
-                        "Hint [OPTIONS] only support " + "non empty key value options");
-    }
-
     @Test // TODO: tweak the tests when FLINK-13604 is fixed.
     public void testCreateTableWithFullDataTypes() {
         final List<TestItem> testItems =
@@ -1128,9 +800,9 @@ public class SqlToOperationConverterTest {
                         });
         String[] columnExpressions =
                 catalogTable.getSchema().getTableColumns().stream()
-                        .filter(ComputedColumn.class::isInstance)
-                        .map(ComputedColumn.class::cast)
-                        .map(ComputedColumn::getExpression)
+                        .filter(TableColumn.ComputedColumn.class::isInstance)
+                        .map(TableColumn.ComputedColumn.class::cast)
+                        .map(TableColumn.ComputedColumn::getExpression)
                         .toArray(String[]::new);
         String[] expected =
                 new String[] {
@@ -2477,272 +2149,6 @@ public class SqlToOperationConverterTest {
         assertThat(operation).isInstanceOf(CreateViewOperation.class);
     }
 
-    @Test
-    public void testBeginStatementSet() {
-        final String sql = "BEGIN STATEMENT SET";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(BeginStatementSetOperation.class);
-        final BeginStatementSetOperation beginStatementSetOperation =
-                (BeginStatementSetOperation) operation;
-
-        assertThat(beginStatementSetOperation.asSummaryString()).isEqualTo("BEGIN STATEMENT SET");
-    }
-
-    @Test
-    public void testEnd() {
-        final String sql = "END";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(EndStatementSetOperation.class);
-        final EndStatementSetOperation endStatementSetOperation =
-                (EndStatementSetOperation) operation;
-
-        assertThat(endStatementSetOperation.asSummaryString()).isEqualTo("END");
-    }
-
-    @Test
-    public void testSqlRichExplainWithSelect() {
-        final String sql = "explain plan for select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(ExplainOperation.class);
-    }
-
-    @Test
-    public void testSqlRichExplainWithInsert() {
-        final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(ExplainOperation.class);
-    }
-
-    @Test
-    public void testSqlRichExplainWithStatementSet() {
-        final String sql =
-                "explain plan for statement set begin "
-                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
-                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
-                        + "end";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(ExplainOperation.class);
-    }
-
-    @Test
-    public void testExplainDetailsWithSelect() {
-        final String sql =
-                "explain estimated_cost, changelog_mode, plan_advice select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        assertExplainDetails(parse(sql, planner, parser));
-    }
-
-    @Test
-    public void testExplainDetailsWithInsert() {
-        final String sql =
-                "explain estimated_cost, changelog_mode, plan_advice insert into t1 select a, b, c, d from t2";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        assertExplainDetails(parse(sql, planner, parser));
-    }
-
-    @Test
-    public void testExplainDetailsWithStatementSet() {
-        final String sql =
-                "explain estimated_cost, changelog_mode, plan_advice statement set begin "
-                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
-                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
-                        + "end";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        assertExplainDetails(parse(sql, planner, parser));
-    }
-
-    private void assertExplainDetails(Operation operation) {
-        Set<String> expectedDetail = new HashSet<>();
-        expectedDetail.add(ExplainDetail.ESTIMATED_COST.toString());
-        expectedDetail.add(ExplainDetail.CHANGELOG_MODE.toString());
-        expectedDetail.add(ExplainDetail.PLAN_ADVICE.toString());
-        assertThat(operation)
-                .asInstanceOf(type(ExplainOperation.class))
-                .satisfies(
-                        explain ->
-                                assertThat(explain.getExplainDetails()).isEqualTo(expectedDetail));
-    }
-
-    @Test
-    public void testSqlExecuteWithStatementSet() {
-        final String sql =
-                "execute statement set begin "
-                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
-                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
-                        + "end";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(StatementSetOperation.class);
-    }
-
-    @Test
-    public void testSqlExecuteWithInsert() {
-        final String sql = "execute insert into t1 select a, b, c, d from t2 where a > 1";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
-    }
-
-    @Test
-    public void testSqlExecuteWithSelect() {
-        final String sql = "execute select a, b, c, d from t2 where a > 1";
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        Operation operation = parse(sql, planner, parser);
-        assertThat(operation).isInstanceOf(QueryOperation.class);
-    }
-
-    @Test
-    public void testAddJar() {
-        Arrays.asList(
-                        "./test.\njar",
-                        "file:///path/to/whatever",
-                        "../test-jar.jar",
-                        "/root/test.jar",
-                        "test\\ jar.jar",
-                        "oss://path/helloworld.go")
-                .forEach(
-                        jarPath -> {
-                            AddJarOperation operation =
-                                    (AddJarOperation)
-                                            parser.parse(String.format("ADD JAR '%s'", jarPath))
-                                                    .get(0);
-                            assertThat(operation.getPath()).isEqualTo(jarPath);
-                        });
-    }
-
-    @Test
-    public void testRemoveJar() {
-        Arrays.asList(
-                        "./test.\njar",
-                        "file:///path/to/whatever",
-                        "../test-jar.jar",
-                        "/root/test.jar",
-                        "test\\ jar.jar",
-                        "oss://path/helloworld.go")
-                .forEach(
-                        jarPath -> {
-                            RemoveJarOperation operation =
-                                    (RemoveJarOperation)
-                                            parser.parse(String.format("REMOVE JAR '%s'", jarPath))
-                                                    .get(0);
-                            assertThat(operation.getPath()).isEqualTo(jarPath);
-                        });
-    }
-
-    @Test
-    public void testShowJars() {
-        final String sql = "SHOW JARS";
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(ShowJarsOperation.class);
-        final ShowJarsOperation showModulesOperation = (ShowJarsOperation) operation;
-        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW JARS");
-    }
-
-    @Test
-    public void testSet() {
-        Operation operation1 = parse("SET");
-        assertThat(operation1).isInstanceOf(SetOperation.class);
-        SetOperation setOperation1 = (SetOperation) operation1;
-        assertThat(setOperation1.getKey()).isNotPresent();
-        assertThat(setOperation1.getValue()).isNotPresent();
-
-        Operation operation2 = parse("SET 'test-key' = 'test-value'");
-        assertThat(operation2).isInstanceOf(SetOperation.class);
-        SetOperation setOperation2 = (SetOperation) operation2;
-        assertThat(setOperation2.getKey()).hasValue("test-key");
-        assertThat(setOperation2.getValue()).hasValue("test-value");
-    }
-
-    @Test
-    public void testReset() {
-        Operation operation1 = parse("RESET");
-        assertThat(operation1).isInstanceOf(ResetOperation.class);
-        assertThat(((ResetOperation) operation1).getKey()).isNotPresent();
-
-        Operation operation2 = parse("RESET 'test-key'");
-        assertThat(operation2).isInstanceOf(ResetOperation.class);
-        assertThat(((ResetOperation) operation2).getKey()).isPresent();
-        assertThat(((ResetOperation) operation2).getKey()).hasValue("test-key");
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {"SET", "SET;", "SET ;", "SET\t;", "SET\n;"})
-    public void testSetCommands(String command) {
-        ExtendedParser extendedParser = new ExtendedParser();
-        assertThat(extendedParser.parse(command)).get().isInstanceOf(SetOperation.class);
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {"HELP", "HELP;", "HELP ;", "HELP\t;", "HELP\n;"})
-    public void testHelpCommands(String command) {
-        ExtendedParser extendedParser = new ExtendedParser();
-        assertThat(extendedParser.parse(command)).get().isInstanceOf(HelpOperation.class);
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {"CLEAR", "CLEAR;", "CLEAR ;", "CLEAR\t;", "CLEAR\n;"})
-    public void testClearCommands(String command) {
-        ExtendedParser extendedParser = new ExtendedParser();
-        assertThat(extendedParser.parse(command)).get().isInstanceOf(ClearOperation.class);
-    }
-
-    @ParameterizedTest
-    @ValueSource(
-            strings = {
-                "QUIT;", "QUIT;", "QUIT ;", "QUIT\t;", "QUIT\n;", "EXIT;", "EXIT ;", "EXIT\t;",
-                "EXIT\n;", "EXIT ; "
-            })
-    public void testQuitCommands(String command) {
-        ExtendedParser extendedParser = new ExtendedParser();
-        assertThat(extendedParser.parse(command)).get().isInstanceOf(QuitOperation.class);
-    }
-
-    @Test
-    public void testDelete() throws Exception {
-        Map<String, String> options = new HashMap<>();
-        options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.INT().notNull())
-                                .column("c", DataTypes.STRING().notNull())
-                                .build(),
-                        null,
-                        Collections.emptyList(),
-                        options);
-        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("builtin", "default", "test_delete");
-        catalogManager.createTable(catalogTable, tableIdentifier, false);
-
-        // no filter in delete statement
-        Operation operation = parse("DELETE FROM test_delete");
-        checkDeleteFromFilterOperation(operation, "[]");
-
-        // with filters in delete statement
-        operation = parse("DELETE FROM test_delete where a = 1 and c = '123'");
-        checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]");
-
-        // with filter = false after reduced in delete statement
-        operation = parse("DELETE FROM test_delete where a = 1 + 6 and a = 2");
-        checkDeleteFromFilterOperation(operation, "[false]");
-
-        operation = parse("DELETE FROM test_delete where a = (select count(*) from test_delete)");
-        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
-        SinkModifyOperation modifyOperation = (SinkModifyOperation) operation;
-        assertThat(modifyOperation.isDelete()).isTrue();
-    }
-
     // ~ Tool Methods ----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
@@ -2757,64 +2163,6 @@ public class SqlToOperationConverterTest {
         return testItem;
     }
 
-    private void checkExplainSql(String sql) {
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        assertThat(node).isInstanceOf(SqlRichExplain.class);
-        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
-        assertThat(operation).isInstanceOf(ExplainOperation.class);
-    }
-
-    private void assertShowFunctions(
-            String sql, String expectedSummary, FunctionScope expectedScope) {
-        Operation operation = parse(sql);
-        assertThat(operation).isInstanceOf(ShowFunctionsOperation.class);
-
-        final ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
-
-        assertThat(showFunctionsOperation.getFunctionScope()).isEqualTo(expectedScope);
-        assertThat(showFunctionsOperation.asSummaryString()).isEqualTo(expectedSummary);
-    }
-
-    private void assertAlterTableOptions(
-            Operation operation,
-            ObjectIdentifier expectedIdentifier,
-            Map<String, String> expectedOptions,
-            List<TableChange> expectedChanges,
-            String expectedSummary) {
-        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
-        final AlterTableChangeOperation alterTableOptionsOperation =
-                (AlterTableChangeOperation) operation;
-        assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
-        assertThat(alterTableOptionsOperation.getNewTable().getOptions())
-                .isEqualTo(expectedOptions);
-        assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
-        assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
-    }
-
-    private void assertAlterTableSchema(
-            Operation operation, ObjectIdentifier expectedIdentifier, Schema expectedSchema) {
-        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
-        final AlterTableChangeOperation alterTableChangeOperation =
-                (AlterTableChangeOperation) operation;
-        assertThat(alterTableChangeOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
-        assertThat(alterTableChangeOperation.getNewTable().getUnresolvedSchema())
-                .isEqualTo(expectedSchema);
-    }
-
-    private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
-    private Operation parse(String sql) {
-        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
-        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
-        SqlNode node = parser.parse(sql);
-        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
-    }
-
     private void prepareNonManagedTable(boolean hasConstraint) throws Exception {
         prepareNonManagedTable("tb1", hasConstraint ? 1 : 0);
     }
@@ -2897,14 +2245,30 @@ public class SqlToOperationConverterTest {
         catalogManager.createTable(catalogTable, tableIdentifier, true);
     }
 
-    private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createFlinkPlanner();
+    private void assertAlterTableOptions(
+            Operation operation,
+            ObjectIdentifier expectedIdentifier,
+            Map<String, String> expectedOptions,
+            List<TableChange> expectedChanges,
+            String expectedSummary) {
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+        final AlterTableChangeOperation alterTableOptionsOperation =
+                (AlterTableChangeOperation) operation;
+        assertThat(alterTableOptionsOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
+        assertThat(alterTableOptionsOperation.getNewTable().getOptions())
+                .isEqualTo(expectedOptions);
+        assertThat(expectedChanges).isEqualTo(alterTableOptionsOperation.getTableChanges());
+        assertThat(alterTableOptionsOperation.asSummaryString()).isEqualTo(expectedSummary);
     }
 
-    private CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
-        tableConfig.setSqlDialect(sqlDialect);
-        return plannerContext.createCalciteParser();
+    private void assertAlterTableSchema(
+            Operation operation, ObjectIdentifier expectedIdentifier, Schema expectedSchema) {
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
+        final AlterTableChangeOperation alterTableChangeOperation =
+                (AlterTableChangeOperation) operation;
+        assertThat(alterTableChangeOperation.getTableIdentifier()).isEqualTo(expectedIdentifier);
+        assertThat(alterTableChangeOperation.getNewTable().getUnresolvedSchema())
+                .isEqualTo(expectedSchema);
     }
 
     private void checkAlterNonExistTable(String sqlTemplate) {
@@ -2939,15 +2303,6 @@ public class SqlToOperationConverterTest {
                         TestManagedTableFactory.ENRICHED_VALUE);
     }
 
-    private static void checkDeleteFromFilterOperation(
-            Operation operation, String expectedFilters) {
-        assertThat(operation).isInstanceOf(DeleteFromFilterOperation.class);
-        DeleteFromFilterOperation deleteFromFiltersOperation =
-                (DeleteFromFilterOperation) operation;
-        List<ResolvedExpression> filters = deleteFromFiltersOperation.getFilters();
-        assertThat(filters.toString()).isEqualTo(expectedFilters);
-    }
-
     // ~ Inner Classes ----------------------------------------------------------
 
     private static class TestItem {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
new file mode 100644
index 00000000000..3b0b8c8e50e
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.sql.parser.dql.SqlRichExplain;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
+import org.apache.flink.table.operations.EndStatementSetOperation;
+import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.SinkModifyOperation;
+import org.apache.flink.table.operations.StatementSetOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.parse.CalciteParser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
+/** Test cases for the DML statements for {@link SqlToOperationConverter}. */
+public class SqlDmlToOperationConverterTest extends SqlToOperationConverterTestBase {
+
+    @Test
+    public void testExplainWithSelect() {
+        final String sql = "explain select * from t1";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithInsert() {
+        final String sql = "explain insert into t2 select * from t1";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithUnion() {
+        final String sql = "explain select * from t1 union select * from t2";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testExplainWithExplainDetails() {
+        String sql = "explain changelog_mode, estimated_cost, json_execution_plan select * from t1";
+        checkExplainSql(sql);
+    }
+
+    @Test
+    public void testSqlInsertWithStaticPartition() {
+        final String sql = "insert into t1 partition(a=1) select b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
+        final Map<String, String> expectedStaticPartitions = new HashMap<>();
+        expectedStaticPartitions.put("a", "1");
+        assertThat(sinkModifyOperation.getStaticPartitions()).isEqualTo(expectedStaticPartitions);
+    }
+
+    @Test
+    public void testSqlInsertWithDynamicTableOptions() {
+        final String sql =
+                "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
+                        + "select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
+        Map<String, String> dynamicOptions = sinkModifyOperation.getDynamicOptions();
+        assertThat(dynamicOptions).isNotNull();
+        assertThat(dynamicOptions.size()).isEqualTo(2);
+        assertThat(dynamicOptions.toString()).isEqualTo("{k1=v1, k2=v2}");
+    }
+
+    @Test
+    public void testDynamicTableWithInvalidOptions() {
+        final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        assertThatThrownBy(() -> parse(sql, planner, parser))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining(
+                        "Hint [OPTIONS] only support " + "non empty key value options");
+    }
+
+    @Test
+    public void testBeginStatementSet() {
+        final String sql = "BEGIN STATEMENT SET";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(BeginStatementSetOperation.class);
+        final BeginStatementSetOperation beginStatementSetOperation =
+                (BeginStatementSetOperation) operation;
+
+        assertThat(beginStatementSetOperation.asSummaryString()).isEqualTo("BEGIN STATEMENT SET");
+    }
+
+    @Test
+    public void testEnd() {
+        final String sql = "END";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(EndStatementSetOperation.class);
+        final EndStatementSetOperation endStatementSetOperation =
+                (EndStatementSetOperation) operation;
+
+        assertThat(endStatementSetOperation.asSummaryString()).isEqualTo("END");
+    }
+
+    @Test
+    public void testSqlRichExplainWithSelect() {
+        final String sql = "explain plan for select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
+    @Test
+    public void testSqlRichExplainWithInsert() {
+        final String sql = "explain plan for insert into t1 select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
+    @Test
+    public void testSqlRichExplainWithStatementSet() {
+        final String sql =
+                "explain plan for statement set begin "
+                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
+                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
+                        + "end";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
+    @Test
+    public void testExplainDetailsWithSelect() {
+        final String sql =
+                "explain estimated_cost, changelog_mode, plan_advice select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        assertExplainDetails(parse(sql, planner, parser));
+    }
+
+    @Test
+    public void testExplainDetailsWithInsert() {
+        final String sql =
+                "explain estimated_cost, changelog_mode, plan_advice insert into t1 select a, b, c, d from t2";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        assertExplainDetails(parse(sql, planner, parser));
+    }
+
+    @Test
+    public void testExplainDetailsWithStatementSet() {
+        final String sql =
+                "explain estimated_cost, changelog_mode, plan_advice statement set begin "
+                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
+                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
+                        + "end";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        assertExplainDetails(parse(sql, planner, parser));
+    }
+
+    private void assertExplainDetails(Operation operation) {
+        Set<String> expectedDetail = new HashSet<>();
+        expectedDetail.add(ExplainDetail.ESTIMATED_COST.toString());
+        expectedDetail.add(ExplainDetail.CHANGELOG_MODE.toString());
+        expectedDetail.add(ExplainDetail.PLAN_ADVICE.toString());
+        assertThat(operation)
+                .asInstanceOf(type(ExplainOperation.class))
+                .satisfies(
+                        explain ->
+                                assertThat(explain.getExplainDetails()).isEqualTo(expectedDetail));
+    }
+
+    @Test
+    public void testSqlExecuteWithStatementSet() {
+        final String sql =
+                "execute statement set begin "
+                        + "insert into t1 select a, b, c, d from t2 where a > 1;"
+                        + "insert into t1 select a, b, c, d from t2 where a > 2;"
+                        + "end";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(StatementSetOperation.class);
+    }
+
+    @Test
+    public void testSqlExecuteWithInsert() {
+        final String sql = "execute insert into t1 select a, b, c, d from t2 where a > 1";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+    }
+
+    @Test
+    public void testSqlExecuteWithSelect() {
+        final String sql = "execute select a, b, c, d from t2 where a > 1";
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        Operation operation = parse(sql, planner, parser);
+        assertThat(operation).isInstanceOf(QueryOperation.class);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("c", DataTypes.STRING().notNull())
+                                .build(),
+                        null,
+                        Collections.emptyList(),
+                        options);
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("builtin", "default", "test_delete");
+        catalogManager.createTable(catalogTable, tableIdentifier, false);
+
+        // no filter in delete statement
+        Operation operation = parse("DELETE FROM test_delete");
+        checkDeleteFromFilterOperation(operation, "[]");
+
+        // with filters in delete statement
+        operation = parse("DELETE FROM test_delete where a = 1 and c = '123'");
+        checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]");
+
+        // with filter = false after reduced in delete statement
+        operation = parse("DELETE FROM test_delete where a = 1 + 6 and a = 2");
+        checkDeleteFromFilterOperation(operation, "[false]");
+
+        operation = parse("DELETE FROM test_delete where a = (select count(*) from test_delete)");
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+        SinkModifyOperation modifyOperation = (SinkModifyOperation) operation;
+        assertThat(modifyOperation.isDelete()).isTrue();
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("b", DataTypes.BIGINT().nullable())
+                                .column("c", DataTypes.STRING().notNull())
+                                .build(),
+                        null,
+                        Collections.emptyList(),
+                        options);
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("builtin", "default", "test_update");
+        catalogManager.createTable(catalogTable, tableIdentifier, false);
+
+        Operation operation = parse("UPDATE test_update SET a = 1, c = '123'");
+        checkUpdateOperation(operation);
+
+        operation = parse("UPDATE test_update SET a = 1, c = '123' WHERE a = 3");
+        checkUpdateOperation(operation);
+
+        operation =
+                parse(
+                        "UPDATE test_update SET a = 1, c = '123' WHERE b = 2 and a = (select count(*) from test_update)");
+        checkUpdateOperation(operation);
+    }
+
+    private void checkExplainSql(String sql) {
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = parser.parse(sql);
+        assertThat(node).isInstanceOf(SqlRichExplain.class);
+        Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get();
+        assertThat(operation).isInstanceOf(ExplainOperation.class);
+    }
+
+    private static void checkDeleteFromFilterOperation(
+            Operation operation, String expectedFilters) {
+        assertThat(operation).isInstanceOf(DeleteFromFilterOperation.class);
+        DeleteFromFilterOperation deleteFromFiltersOperation =
+                (DeleteFromFilterOperation) operation;
+        List<ResolvedExpression> filters = deleteFromFiltersOperation.getFilters();
+        assertThat(filters.toString()).isEqualTo(expectedFilters);
+    }
+
+    private static void checkUpdateOperation(Operation operation) {
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+        SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
+        assertThat(sinkModifyOperation.isUpdate()).isTrue();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
new file mode 100644
index 00000000000..a58005fc70f
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlOtherOperationConverterTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.operations.LoadModuleOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ShowFunctionsOperation;
+import org.apache.flink.table.operations.ShowModulesOperation;
+import org.apache.flink.table.operations.ShowTablesOperation;
+import org.apache.flink.table.operations.UnloadModuleOperation;
+import org.apache.flink.table.operations.UseCatalogOperation;
+import org.apache.flink.table.operations.UseDatabaseOperation;
+import org.apache.flink.table.operations.UseModulesOperation;
+import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.ClearOperation;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.QuitOperation;
+import org.apache.flink.table.operations.command.RemoveJarOperation;
+import org.apache.flink.table.operations.command.ResetOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.ShowJarsOperation;
+import org.apache.flink.table.planner.parse.ExtendedParser;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Test cases for the statements that neither belong to DDL nor DML for {@link
+ * SqlToOperationConverter}.
+ */
+public class SqlOtherOperationConverterTest extends SqlToOperationConverterTestBase {
+
+    @Test
+    public void testUseCatalog() {
+        final String sql = "USE CATALOG cat1";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(UseCatalogOperation.class);
+        assertThat(((UseCatalogOperation) operation).getCatalogName()).isEqualTo("cat1");
+        assertThat(operation.asSummaryString()).isEqualTo("USE CATALOG cat1");
+    }
+
+    @Test
+    public void testUseDatabase() {
+        final String sql1 = "USE db1";
+        Operation operation1 = parse(sql1);
+        assertThat(operation1).isInstanceOf(UseDatabaseOperation.class);
+        assertThat(((UseDatabaseOperation) operation1).getCatalogName()).isEqualTo("builtin");
+        assertThat(((UseDatabaseOperation) operation1).getDatabaseName()).isEqualTo("db1");
+
+        final String sql2 = "USE cat1.db1";
+        Operation operation2 = parse(sql2);
+        assertThat(operation2).isInstanceOf(UseDatabaseOperation.class);
+        assertThat(((UseDatabaseOperation) operation2).getCatalogName()).isEqualTo("cat1");
+        assertThat(((UseDatabaseOperation) operation2).getDatabaseName()).isEqualTo("db1");
+    }
+
+    @Test
+    public void testUseDatabaseWithException() {
+        final String sql = "USE cat1.db1.tbl1";
+        assertThatThrownBy(() -> parse(sql)).isInstanceOf(ValidationException.class);
+    }
+
+    @Test
+    public void testLoadModule() {
+        final String sql = "LOAD MODULE dummy WITH ('k1' = 'v1', 'k2' = 'v2')";
+        final String expectedModuleName = "dummy";
+        final Map<String, String> expectedOptions = new HashMap<>();
+        expectedOptions.put("k1", "v1");
+        expectedOptions.put("k2", "v2");
+
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(LoadModuleOperation.class);
+        final LoadModuleOperation loadModuleOperation = (LoadModuleOperation) operation;
+
+        assertThat(loadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
+        assertThat(loadModuleOperation.getOptions()).isEqualTo(expectedOptions);
+    }
+
+    @Test
+    public void testUnloadModule() {
+        final String sql = "UNLOAD MODULE dummy";
+        final String expectedModuleName = "dummy";
+
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(UnloadModuleOperation.class);
+
+        final UnloadModuleOperation unloadModuleOperation = (UnloadModuleOperation) operation;
+
+        assertThat(unloadModuleOperation.getModuleName()).isEqualTo(expectedModuleName);
+    }
+
+    @Test
+    public void testUseOneModule() {
+        final String sql = "USE MODULES dummy";
+        final List<String> expectedModuleNames = Collections.singletonList("dummy");
+
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(UseModulesOperation.class);
+
+        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
+
+        assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
+        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE MODULES: [dummy]");
+    }
+
+    @Test
+    public void testUseMultipleModules() {
+        final String sql = "USE MODULES x, y, z";
+        final List<String> expectedModuleNames = Arrays.asList("x", "y", "z");
+
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(UseModulesOperation.class);
+
+        final UseModulesOperation useModulesOperation = (UseModulesOperation) operation;
+
+        assertThat(useModulesOperation.getModuleNames()).isEqualTo(expectedModuleNames);
+        assertThat(useModulesOperation.asSummaryString()).isEqualTo("USE MODULES: [x, y, z]");
+    }
+
+    @Test
+    public void testShowModules() {
+        final String sql = "SHOW MODULES";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
+        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
+
+        assertThat(showModulesOperation.requireFull()).isFalse();
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW MODULES");
+    }
+
+    @Test
+    public void testShowTables() {
+        final String sql = "SHOW TABLES from cat1.db1 not like 't%'";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowTablesOperation.class);
+
+        ShowTablesOperation showTablesOperation = (ShowTablesOperation) operation;
+        assertThat(showTablesOperation.getCatalogName()).isEqualTo("cat1");
+        assertThat(showTablesOperation.getDatabaseName()).isEqualTo("db1");
+        assertThat(showTablesOperation.getPreposition()).isEqualTo("FROM");
+        assertThat(showTablesOperation.isUseLike()).isTrue();
+        assertThat(showTablesOperation.isNotLike()).isTrue();
+
+        final String sql2 = "SHOW TABLES in db2";
+        showTablesOperation = (ShowTablesOperation) parse(sql2);
+        assertThat(showTablesOperation.getCatalogName()).isEqualTo("builtin");
+        assertThat(showTablesOperation.getDatabaseName()).isEqualTo("db2");
+        assertThat(showTablesOperation.getPreposition()).isEqualTo("IN");
+        assertThat(showTablesOperation.isUseLike()).isFalse();
+        assertThat(showTablesOperation.isNotLike()).isFalse();
+
+        final String sql3 = "SHOW TABLES";
+        showTablesOperation = (ShowTablesOperation) parse(sql3);
+        assertThat(showTablesOperation.getCatalogName()).isNull();
+        assertThat(showTablesOperation.getDatabaseName()).isNull();
+        assertThat(showTablesOperation.getPreposition()).isNull();
+    }
+
+    @Test
+    public void testShowFullModules() {
+        final String sql = "SHOW FULL MODULES";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowModulesOperation.class);
+        final ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;
+
+        assertThat(showModulesOperation.requireFull()).isTrue();
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW FULL MODULES");
+    }
+
+    @Test
+    public void testShowFunctions() {
+        final String sql1 = "SHOW FUNCTIONS";
+        assertShowFunctions(sql1, sql1, ShowFunctionsOperation.FunctionScope.ALL);
+
+        final String sql2 = "SHOW USER FUNCTIONS";
+        assertShowFunctions(sql2, sql2, ShowFunctionsOperation.FunctionScope.USER);
+    }
+
+    @Test
+    public void testAddJar() {
+        Arrays.asList(
+                        "./test.\njar",
+                        "file:///path/to/whatever",
+                        "../test-jar.jar",
+                        "/root/test.jar",
+                        "test\\ jar.jar",
+                        "oss://path/helloworld.go")
+                .forEach(
+                        jarPath -> {
+                            AddJarOperation operation =
+                                    (AddJarOperation)
+                                            parser.parse(String.format("ADD JAR '%s'", jarPath))
+                                                    .get(0);
+                            assertThat(operation.getPath()).isEqualTo(jarPath);
+                        });
+    }
+
+    @Test
+    public void testRemoveJar() {
+        Arrays.asList(
+                        "./test.\njar",
+                        "file:///path/to/whatever",
+                        "../test-jar.jar",
+                        "/root/test.jar",
+                        "test\\ jar.jar",
+                        "oss://path/helloworld.go")
+                .forEach(
+                        jarPath -> {
+                            RemoveJarOperation operation =
+                                    (RemoveJarOperation)
+                                            parser.parse(String.format("REMOVE JAR '%s'", jarPath))
+                                                    .get(0);
+                            assertThat(operation.getPath()).isEqualTo(jarPath);
+                        });
+    }
+
+    @Test
+    public void testShowJars() {
+        final String sql = "SHOW JARS";
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowJarsOperation.class);
+        final ShowJarsOperation showModulesOperation = (ShowJarsOperation) operation;
+        assertThat(showModulesOperation.asSummaryString()).isEqualTo("SHOW JARS");
+    }
+
+    @Test
+    public void testSet() {
+        Operation operation1 = parse("SET");
+        assertThat(operation1).isInstanceOf(SetOperation.class);
+        SetOperation setOperation1 = (SetOperation) operation1;
+        assertThat(setOperation1.getKey()).isNotPresent();
+        assertThat(setOperation1.getValue()).isNotPresent();
+
+        Operation operation2 = parse("SET 'test-key' = 'test-value'");
+        assertThat(operation2).isInstanceOf(SetOperation.class);
+        SetOperation setOperation2 = (SetOperation) operation2;
+        assertThat(setOperation2.getKey()).hasValue("test-key");
+        assertThat(setOperation2.getValue()).hasValue("test-value");
+    }
+
+    @Test
+    public void testReset() {
+        Operation operation1 = parse("RESET");
+        assertThat(operation1).isInstanceOf(ResetOperation.class);
+        assertThat(((ResetOperation) operation1).getKey()).isNotPresent();
+
+        Operation operation2 = parse("RESET 'test-key'");
+        assertThat(operation2).isInstanceOf(ResetOperation.class);
+        assertThat(((ResetOperation) operation2).getKey()).isPresent();
+        assertThat(((ResetOperation) operation2).getKey()).hasValue("test-key");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"SET", "SET;", "SET ;", "SET\t;", "SET\n;"})
+    public void testSetCommands(String command) {
+        ExtendedParser extendedParser = new ExtendedParser();
+        assertThat(extendedParser.parse(command)).get().isInstanceOf(SetOperation.class);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"HELP", "HELP;", "HELP ;", "HELP\t;", "HELP\n;"})
+    public void testHelpCommands(String command) {
+        ExtendedParser extendedParser = new ExtendedParser();
+        assertThat(extendedParser.parse(command)).get().isInstanceOf(HelpOperation.class);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"CLEAR", "CLEAR;", "CLEAR ;", "CLEAR\t;", "CLEAR\n;"})
+    public void testClearCommands(String command) {
+        ExtendedParser extendedParser = new ExtendedParser();
+        assertThat(extendedParser.parse(command)).get().isInstanceOf(ClearOperation.class);
+    }
+
+    @ParameterizedTest
+    @ValueSource(
+            strings = {
+                "QUIT;", "QUIT;", "QUIT ;", "QUIT\t;", "QUIT\n;", "EXIT;", "EXIT ;", "EXIT\t;",
+                "EXIT\n;", "EXIT ; "
+            })
+    public void testQuitCommands(String command) {
+        ExtendedParser extendedParser = new ExtendedParser();
+        assertThat(extendedParser.parse(command)).get().isInstanceOf(QuitOperation.class);
+    }
+
+    private void assertShowFunctions(
+            String sql,
+            String expectedSummary,
+            ShowFunctionsOperation.FunctionScope expectedScope) {
+        Operation operation = parse(sql);
+        assertThat(operation).isInstanceOf(ShowFunctionsOperation.class);
+
+        final ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;
+
+        assertThat(showFunctionsOperation.getFunctionScope()).isEqualTo(expectedScope);
+        assertThat(showFunctionsOperation.asSummaryString()).isEqualTo(expectedSummary);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTestBase.java
new file mode 100644
index 00000000000..d9c8281dee0
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTestBase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.ParserImpl;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.parse.CalciteParser;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+import org.apache.flink.table.utils.ExpressionResolverMocks;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+
+/** Test base for testing convert sql statement to operation. */
+public class SqlToOperationConverterTestBase {
+    private final boolean isStreamingMode = false;
+    private final TableConfig tableConfig = TableConfig.getDefault();
+    protected final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
+    protected final CatalogManager catalogManager =
+            CatalogManagerMocks.preparedCatalogManager()
+                    .defaultCatalog("builtin", catalog)
+                    .config(
+                            Configuration.fromMap(
+                                    Collections.singletonMap(
+                                            ExecutionOptions.RUNTIME_MODE.key(),
+                                            RuntimeExecutionMode.BATCH.name())))
+                    .build();
+
+    private final PlannerMocks plannerMocks =
+            PlannerMocks.newBuilder()
+                    .withBatchMode(true)
+                    .withTableConfig(tableConfig)
+                    .withCatalogManager(catalogManager)
+                    .withRootSchema(
+                            asRootSchema(
+                                    new CatalogManagerCalciteSchema(
+                                            catalogManager, isStreamingMode)))
+                    .build();
+    private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
+    protected final FunctionCatalog functionCatalog = plannerMocks.getFunctionCatalog();
+
+    private final Supplier<FlinkPlannerImpl> plannerSupplier = plannerContext::createFlinkPlanner;
+
+    protected final Parser parser =
+            new ParserImpl(
+                    catalogManager,
+                    plannerSupplier,
+                    () -> plannerSupplier.get().parser(),
+                    plannerContext.getRexFactory());
+
+    @BeforeEach
+    public void before() throws TableAlreadyExistException, DatabaseNotExistException {
+        catalogManager.initSchemaResolver(
+                isStreamingMode,
+                ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser));
+
+        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+        final TableSchema tableSchema =
+                TableSchema.builder()
+                        .field("a", DataTypes.BIGINT())
+                        .field("b", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                        .field("c", DataTypes.INT())
+                        .field("d", DataTypes.VARCHAR(Integer.MAX_VALUE))
+                        .build();
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "COLLECTION");
+        final CatalogTable catalogTable = new CatalogTableImpl(tableSchema, options, "");
+        catalog.createTable(path1, catalogTable, true);
+        catalog.createTable(path2, catalogTable, true);
+    }
+
+    @AfterEach
+    public void after() throws TableNotExistException {
+        final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1");
+        final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2");
+        catalog.dropTable(path1, true);
+        catalog.dropTable(path2, true);
+    }
+
+    protected Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser parser) {
+        SqlNode node = parser.parse(sql);
+        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
+    }
+
+    protected Operation parse(String sql) {
+        FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
+        final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
+        SqlNode node = parser.parse(sql);
+        return SqlToOperationConverter.convert(planner, catalogManager, node).get();
+    }
+
+    protected FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) {
+        tableConfig.setSqlDialect(sqlDialect);
+        return plannerContext.createFlinkPlanner();
+    }
+
+    protected CalciteParser getParserBySqlDialect(SqlDialect sqlDialect) {
+        tableConfig.setSqlDialect(sqlDialect);
+        return plannerContext.createCalciteParser();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java
new file mode 100644
index 00000000000..49c52415100
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import scala.collection.Seq;
+
+/** Test for row-level update. */
+@RunWith(Parameterized.class)
+public class RowLevelUpdateTest extends TableTestBase {
+
+    private final Seq<ExplainDetail> explainDetails =
+            JavaScalaConversionUtil.toScala(
+                    Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN));
+    private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+
+    private BatchTableTestUtil util;
+
+    @Parameterized.Parameters(name = "updateMode = {0}")
+    public static Collection<SupportsRowLevelUpdate.RowLevelUpdateMode> data() {
+        return Arrays.asList(
+                SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS,
+                SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS);
+    }
+
+    public RowLevelUpdateTest(SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+        this.updateMode = updateMode;
+    }
+
+    @Before
+    public void before() {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.tableEnv()
+                .getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 12);
+    }
+
+    @Test
+    public void testUpdateWithoutFilter() {
+        createTableForUpdate();
+        util.verifyExplainInsert("UPDATE t SET b = 'n1', a = char_length(b) * a ", explainDetails);
+    }
+
+    @Test
+    public void testUpdateWithFilter() {
+        createTableForUpdate();
+        util.verifyExplainInsert(
+                "UPDATE t SET b = 'v2' WHERE a = 123 AND b = 'v1'", explainDetails);
+    }
+
+    @Test
+    public void testUpdateWithSubQuery() {
+        createTableForUpdate();
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t1 (a int, b string) WITH "
+                                        + "('connector' = 'test-update-delete', 'update-mode' = '%s') ",
+                                updateMode));
+        util.verifyExplainInsert(
+                "UPDATE t SET b = 'v2' WHERE a = (SELECT count(*) FROM t1)", explainDetails);
+    }
+
+    @Test
+    public void testUpdateAllColsWithOnlyRequireUpdatedCols() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) WITH "
+                                        + "('connector' = 'test-update-delete',"
+                                        + " 'update-mode' = '%s',"
+                                        + " 'only-require-updated-columns-for-update' = 'true'"
+                                        + ") ",
+                                updateMode));
+        util.verifyExplainInsert(
+                "UPDATE t SET b = 'v2', a = 123, c = c + 1 WHERE c > 123", explainDetails);
+    }
+
+    @Test
+    public void testUpdatePartColsWithOnlyRequireUpdatedCols() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (f0 string, f1 int, a int, b string, c double, f2 string, f3 int) WITH "
+                                        + "('connector' = 'test-update-delete',"
+                                        + " 'update-mode' = '%s',"
+                                        + " 'only-require-updated-columns-for-update' = 'true'"
+                                        + ") ",
+                                updateMode));
+        util.verifyExplainInsert(
+                "UPDATE t SET b = 'v2', a = 123, c = c + 1 WHERE c > 123", explainDetails);
+    }
+
+    @Test
+    public void testUpdateWithCustomColumns() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) WITH"
+                                        + " ("
+                                        + "'connector' = 'test-update-delete', "
+                                        + "'required-columns-for-update' = 'b;c', "
+                                        + "'update-mode' = '%s'"
+                                        + ") ",
+                                updateMode));
+        util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE b = '123'", explainDetails);
+    }
+
+    @Test
+    public void testUpdateWithMetaColumns() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) WITH"
+                                        + " ("
+                                        + "'connector' = 'test-update-delete', "
+                                        + "'required-columns-for-update' = 'meta_f1;meta_k2;a;b', "
+                                        + "'update-mode' = '%s'"
+                                        + ") ",
+                                updateMode));
+        util.verifyExplainInsert("UPDATE t SET b = 'v2' WHERE b = '123'", explainDetails);
+    }
+
+    private void createTableForUpdate() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string) WITH "
+                                        + "('connector' = 'test-update-delete', 'update-mode' = '%s') ",
+                                updateMode));
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
new file mode 100644
index 00000000000..793a67eaa59
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for UPDATE statement in batch mode. */
+@RunWith(Parameterized.class)
+public class UpdateTableITCase extends BatchTestBase {
+    private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+
+    @Parameterized.Parameters(name = "updateMode = {0}")
+    public static Collection<SupportsRowLevelUpdate.RowLevelUpdateMode> data() {
+        return Arrays.asList(
+                SupportsRowLevelUpdate.RowLevelUpdateMode.UPDATED_ROWS,
+                SupportsRowLevelUpdate.RowLevelUpdateMode.ALL_ROWS);
+    }
+
+    public UpdateTableITCase(SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+        this.updateMode = updateMode;
+    }
+
+    @Test
+    public void testUpdate() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t ("
+                                        + " a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string,"
+                                        + " c double) WITH"
+                                        + " ('connector' = 'test-update-delete', "
+                                        + "'data-id' = '%s', "
+                                        + "'update-mode' = '%s')",
+                                dataId, updateMode));
+        tEnv().executeSql("UPDATE t SET b = 'uaa', c = c * c WHERE a >= 1").await();
+        List<String> rows = toSortedResults(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uaa, 16.0]]");
+
+        tEnv().executeSql("UPDATE t SET b = 'uab' WHERE a > (SELECT count(1) FROM t WHERE a > 1)")
+                .await();
+        rows = toSortedResults(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uab, 16.0]]");
+    }
+
+    @Test
+    public void testStatementSetContainUpdateAndInsert() throws Exception {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'test-update-delete')");
+        StatementSet statementSet = tEnv().createStatementSet();
+        // should throw exception when statement set contains insert and update statement
+        statementSet.addInsertSql("INSERT INTO t VALUES (1, 'v1', 1)");
+        statementSet.addInsertSql("UPDATE t SET b = 'uaa'");
+        assertThatThrownBy(statementSet::execute)
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Unsupported SQL query! Only accept a single SQL statement of type UPDATE.");
+    }
+
+    @Test
+    public void testCompilePlanSql() throws Exception {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'test-update-delete')");
+        // should throw exception when compile sql for update statement
+        assertThatThrownBy(() -> tEnv().compilePlanSql("UPDATE t SET b = 'uaa'"))
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Unsupported SQL query! compilePlanSql() only accepts a single SQL statement of type INSERT");
+    }
+
+    @Test
+    public void testUpdateWithLegacyTableSink() {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'COLLECTION')");
+        assertThatThrownBy(() -> tEnv().executeSql("UPDATE t SET b = 'uaa'"))
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        String.format(
+                                "Can't perform update operation of the table %s "
+                                        + " because the corresponding table sink is the legacy TableSink,"
+                                        + " Please implement %s for it.",
+                                "`default_catalog`.`default_database`.`t`",
+                                DynamicTableSink.class.getName()));
+    }
+
+    private String registerData() {
+        List<RowData> values = createValue();
+        return TestUpdateDeleteTableFactory.registerRowData(values);
+    }
+
+    private List<RowData> createValue() {
+        List<RowData> values = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            values.add(GenericRowData.of(i, StringData.fromString("b_" + i), i * 2.0));
+        }
+        return values;
+    }
+
+    private List<String> toSortedResults(TableResult result) {
+        return CollectionUtil.iteratorToList(result.collect()).stream()
+                .map(Row::toString)
+                .sorted()
+                .collect(Collectors.toList());
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/UpdateTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/UpdateTableITCase.java
new file mode 100644
index 00000000000..b9e541c08a9
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/UpdateTableITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for UPDATE statement in streaming mode. */
+public class UpdateTableITCase extends StreamingTestBase {
+
+    @Test
+    public void testUpdate() throws Exception {
+        tEnv().executeSql(
+                        "CREATE TABLE t ("
+                                + " a int PRIMARY KEY NOT ENFORCED,"
+                                + " b string,"
+                                + " c double) WITH"
+                                + " ('connector' = 'test-update-delete')");
+        assertThatThrownBy(
+                        () -> tEnv().executeSql("UPDATE t SET b = 'uaa', c = c * c WHERE a >= 1"))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("UPDATE statement is not supported for streaming mode now.");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
new file mode 100644
index 00000000000..593ab178353
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
@@ -0,0 +1,973 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testUpdateWithoutFilter[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[*(CHAR_LENGTH($1), $0)], b=[_UTF-16LE'n1'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[*(CHAR_LENGTH(b), a) AS a, 'n1' AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[(CHAR_LENGTH(b) * a) AS a, 'n1' AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[(CHAR_LENGTH(b) * a) AS a, 'n1' AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithFilter[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[_UTF-16LE'v2'])
+   +- LogicalFilter(condition=[AND(=($0, 123), =($1, _UTF-16LE'v1'))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[CAST(123 AS INTEGER) AS a, 'v2' AS b], where=[AND(=(a, 123), =(b, 'v1'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[CAST(123 AS INTEGER) AS a, 'v2' AS b], where=[((a = 123) AND (b = 'v1'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[CAST(123 AS INTEGER) AS a, 'v2' AS b], where=[((a = 123) AND (b = 'v1'))])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithSubQuery[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[_UTF-16LE'v2'])
+   +- LogicalFilter(condition=[=(CAST($0):BIGINT, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalTableScan(table=[[default_catalog, default_database, t1]])
+}))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, 'v2' AS b])
+   +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true])
+      :- Calc(select=[a, CAST(a AS BIGINT) AS a0])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, 'v2' AS b])
+   +- NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true])
+      :- Calc(select=[a, CAST(a AS BIGINT) AS a0])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, CAST(a AS BIGINT) AS a0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Source: t1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "NestedLoopJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "BROADCAST",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, 'v2' AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateAllColsWithOnlyRequireUpdatedCols[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- LogicalProject(a=[123], b=[_UTF-16LE'v2'], c=[+($2, 1)])
+   +- LogicalFilter(condition=[>($2, 123)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[123 AS a, 'v2' AS b, +(c, 1) AS c], where=[>(c, 123)])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdatePartColsWithOnlyRequireUpdatedCols[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- LogicalProject(a=[123], b=[_UTF-16LE'v2'], c=[+($4, 1)])
+   +- LogicalFilter(condition=[>($4, 123)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[123 AS a, 'v2' AS b, +(c, 1) AS c], where=[>(c, 123)])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[123 AS a, 'v2' AS b, (c + 1) AS c], where=[(c > 123)])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithCustomColumns[updateMode = UPDATED_ROWS]">
+  <Resource name="explain">
+    <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[b, c])
++- LogicalProject(b=[_UTF-16LE'v2'], c=[$2])
+   +- LogicalFilter(condition=[=($1, _UTF-16LE'123')])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=['v2' AS b, c], where=[=(b, '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=['v2' AS b, c], where=[(b = '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=['v2' AS b, c], where=[(b = '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+  </Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithMetaColumns[updateMode = UPDATED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- LogicalProject(meta_f1=[$3], meta_f2=[$4], a=[$0], b=[_UTF-16LE'v2'])
+   +- LogicalFilter(condition=[=($1, _UTF-16LE'123')])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- Calc(select=[meta_f1, meta_f2, a, 'v2' AS b], where=[=(b, '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- Calc(select=[meta_f1, meta_f2, a, 'v2' AS b], where=[(b = '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[meta_f1, meta_f2, a, 'v2' AS b], where=[(b = '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[UPDATE_AFTER])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithoutFilter[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[*(CHAR_LENGTH($1), $0)], b=[_UTF-16LE'n1'])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[*(CHAR_LENGTH(b), a) AS a, 'n1' AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[(CHAR_LENGTH(b) * a) AS a, 'n1' AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[(CHAR_LENGTH(b) * a) AS a, 'n1' AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithFilter[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[IF(AND(=($0, 123), =($1, _UTF-16LE'v1')), _UTF-16LE'v2', $1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, IF(AND(=(a, 123), =(b, 'v1')), 'v2', b) AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, IF(((a = 123) AND (b = 'v1')), 'v2', b) AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, IF(((a = 123) AND (b = 'v1')), 'v2', b) AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithSubQuery[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[IF(=(CAST($0):BIGINT, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalTableScan(table=[[default_catalog, default_database, t1]])
+})), _UTF-16LE'v2', $1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, IF(=(CAST(a AS BIGINT), EXPR$0), 'v2', b) AS b])
+   +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true])
+      :- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, IF((CAST(a AS BIGINT) = EXPR$0), 'v2', b) AS b])
+   +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true])
+      :- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Source: t1[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "NestedLoopJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, EXPR$0], build=[right], singleRowJoin=[true])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "BROADCAST",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, IF((CAST(a AS BIGINT) = EXPR$0), 'v2', b) AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateAllColsWithOnlyRequireUpdatedCols[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- LogicalProject(a=[IF(>($2, 123), 123, $0)], b=[IF(>($2, 123), _UTF-16LE'v2', $1)], c=[IF(>($2, 123), +($2, 1), $2)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[IF(>(c, 123), 123, a) AS a, IF(>(c, 123), 'v2', b) AS b, IF(>(c, 123), +(c, 1), c) AS c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdatePartColsWithOnlyRequireUpdatedCols[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- LogicalProject(a=[IF(>($4, 123), 123, $2)], b=[IF(>($4, 123), _UTF-16LE'v2', $3)], c=[IF(>($4, 123), +($4, 1), $4)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[IF(>(c, 123), 123, a) AS a, IF(>(c, 123), 'v2', b) AS b, IF(>(c, 123), +(c, 1), c) AS c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b, c])
++- Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[f0, f1, a, b, c, f2, f3])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[IF((c > 123), 123, a) AS a, IF((c > 123), 'v2', b) AS b, IF((c > 123), (c + 1), c) AS c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithCustomColumns[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[b, c])
++- LogicalProject(b=[IF(=($1, _UTF-16LE'123'), _UTF-16LE'v2', $1)], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[IF(=(b, '123'), 'v2', b) AS b, c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[IF((b = '123'), 'v2', b) AS b, c])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[IF((b = '123'), 'v2', b) AS b, c])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+  <TestCase name="testUpdateWithMetaColumns[updateMode = ALL_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- LogicalProject(meta_f1=[$3], meta_f2=[$4], a=[$0], b=[IF(=($1, _UTF-16LE'123'), _UTF-16LE'v2', $1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- Calc(select=[meta_f1, meta_f2, a, IF(=(b, '123'), 'v2', b) AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, a, b])
++- Calc(select=[meta_f1, meta_f2, a, IF((b = '123'), 'v2', b) AS b])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, b, c, meta_f1, meta_f2])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[meta_f1, meta_f2, a, IF((b = '123'), 'v2', b) AS b])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+	</Resource>
+  </TestCase>
+</Root>