You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/20 02:58:33 UTC

[flink] 01/02: [FLINK-30662][table] Planner supports delete push down.

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3fa5ecbc8534662fe7e20bf75e23f12890318d02
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sun Jan 15 17:48:13 2023 +0800

    [FLINK-30662][table] Planner supports delete push down.
    
    This closes #21676
---
 .../table/api/internal/TableEnvironmentImpl.java   |  47 +++
 .../operations/DeleteFromFilterOperation.java      |  78 +++++
 .../table/operations/SinkModifyOperation.java      |  51 +++-
 .../planner/operations/DeletePushDownUtils.java    | 283 ++++++++++++++++++
 .../operations/SqlToOperationConverter.java        |  44 +++
 .../PushFilterInCalcIntoTableSourceScanRule.java   |   2 +-
 .../logical/PushFilterIntoSourceScanRuleBase.java  |  24 --
 .../logical/PushFilterIntoTableSourceScanRule.java |   2 +-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |  29 +-
 .../factories/TestUpdateDeleteTableFactory.java    | 326 +++++++++++++++++++++
 .../operations/DeletePushDownUtilsTest.java        | 184 ++++++++++++
 .../operations/SqlToOperationConverterTest.java    |  52 ++++
 .../runtime/batch/sql/DeleteTableITCase.java       |  93 ++++++
 .../runtime/stream/sql/DeleteTableITCase.java      |  38 +++
 .../org.apache.flink.table.factories.Factory       |   1 +
 15 files changed, 1226 insertions(+), 28 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c54c3d5fa2d..3578e1845cb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -89,6 +89,7 @@ import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -718,6 +719,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         if (operations.isEmpty()) {
             return "";
         } else {
+            if (operations.size() > 1
+                    && operations.stream().anyMatch(this::isRowLevelModification)) {
+                throw new TableException("Only single UPDATE/DELETE statement is supported.");
+            }
             return planner.explain(operations, format, extraDetails);
         }
     }
@@ -847,6 +852,25 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                 executeInternal(ctasOperation.getCreateTableOperation());
                 mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
             } else {
+                boolean isRowLevelModification = isRowLevelModification(modify);
+                if (isRowLevelModification) {
+                    String modifyType =
+                            ((SinkModifyOperation) modify).isDelete() ? "DELETE" : "UPDATE";
+                    if (operations.size() > 1) {
+                        throw new TableException(
+                                String.format(
+                                        "Only single %s statement is supported.", modifyType));
+                    }
+                    if (isStreamingMode) {
+                        throw new TableException(
+                                String.format(
+                                        "%s statement is not supported for streaming mode now.",
+                                        modifyType));
+                    }
+                    if (modify instanceof DeleteFromFilterOperation) {
+                        return executeInternal((DeleteFromFilterOperation) modify);
+                    }
+                }
                 mapOperations.add(modify);
             }
         }
@@ -865,6 +889,21 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         return result;
     }
 
+    private TableResultInternal executeInternal(
+            DeleteFromFilterOperation deleteFromFilterOperation) {
+        Optional<Long> rows =
+                deleteFromFilterOperation.getSupportsDeletePushDownSink().executeDeletion();
+        if (rows.isPresent()) {
+            return TableResultImpl.builder()
+                    .resultKind(ResultKind.SUCCESS)
+                    .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
+                    .data(Arrays.asList(Row.of(String.valueOf(rows.get())), Row.of("OK")))
+                    .build();
+        } else {
+            return TableResultImpl.TABLE_RESULT_OK;
+        }
+    }
+
     private TableResultInternal executeInternal(
             List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
         final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
@@ -1973,4 +2012,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     public String explainPlan(InternalPlan compiledPlan, ExplainDetail... extraDetails) {
         return planner.explainPlan(compiledPlan, extraDetails);
     }
+
+    private boolean isRowLevelModification(Operation operation) {
+        if (operation instanceof SinkModifyOperation) {
+            SinkModifyOperation sinkModifyOperation = (SinkModifyOperation) operation;
+            return sinkModifyOperation.isDelete() || sinkModifyOperation.isUpdate();
+        }
+        return true;
+    }
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java
new file mode 100644
index 00000000000..f902663d732
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DeleteFromFilterOperation.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The operation for deleting data in a table according to filters directly. */
+@Internal
+public class DeleteFromFilterOperation extends SinkModifyOperation {
+
+    @Nonnull private final SupportsDeletePushDown supportsDeletePushDownSink;
+    @Nonnull private final List<ResolvedExpression> filters;
+
+    public DeleteFromFilterOperation(
+            ContextResolvedTable contextResolvedTable,
+            @Nonnull SupportsDeletePushDown supportsDeletePushDownSink,
+            @Nonnull List<ResolvedExpression> filters) {
+        super(contextResolvedTable, null, ModifyType.DELETE);
+        this.supportsDeletePushDownSink = Preconditions.checkNotNull(supportsDeletePushDownSink);
+        this.filters = Preconditions.checkNotNull(filters);
+    }
+
+    @Nonnull
+    public SupportsDeletePushDown getSupportsDeletePushDownSink() {
+        return supportsDeletePushDownSink;
+    }
+
+    @Nonnull
+    public List<ResolvedExpression> getFilters() {
+        return filters;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString());
+        params.put("filters", filters);
+        return OperationUtils.formatWithChildren(
+                "DeleteFromFilter", params, Collections.emptyList(), Operation::asSummaryString);
+    }
+
+    @Override
+    public QueryOperation getChild() {
+        throw new UnsupportedOperationException("This shouldn't be called");
+    }
+
+    @Override
+    public <T> T accept(ModifyOperationVisitor<T> visitor) {
+        throw new UnsupportedOperationException("This shouldn't be called");
+    }
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
index e9cdb12b3b5..db360c0f5f2 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
@@ -38,27 +38,58 @@ import java.util.Map;
 @Internal
 public class SinkModifyOperation implements ModifyOperation {
 
-    private final ContextResolvedTable contextResolvedTable;
+    protected final ContextResolvedTable contextResolvedTable;
     private final Map<String, String> staticPartitions;
     private final QueryOperation child;
     private final boolean overwrite;
     private final Map<String, String> dynamicOptions;
+    private final ModifyType modifyType;
 
     public SinkModifyOperation(ContextResolvedTable contextResolvedTable, QueryOperation child) {
         this(contextResolvedTable, child, Collections.emptyMap(), false, Collections.emptyMap());
     }
 
+    public SinkModifyOperation(
+            ContextResolvedTable contextResolvedTable,
+            QueryOperation child,
+            ModifyType modifyType) {
+        this(
+                contextResolvedTable,
+                child,
+                Collections.emptyMap(),
+                false,
+                Collections.emptyMap(),
+                modifyType);
+    }
+
     public SinkModifyOperation(
             ContextResolvedTable contextResolvedTable,
             QueryOperation child,
             Map<String, String> staticPartitions,
             boolean overwrite,
             Map<String, String> dynamicOptions) {
+        this(
+                contextResolvedTable,
+                child,
+                staticPartitions,
+                overwrite,
+                dynamicOptions,
+                ModifyType.INSERT);
+    }
+
+    public SinkModifyOperation(
+            ContextResolvedTable contextResolvedTable,
+            QueryOperation child,
+            Map<String, String> staticPartitions,
+            boolean overwrite,
+            Map<String, String> dynamicOptions,
+            ModifyType modifyType) {
         this.contextResolvedTable = contextResolvedTable;
         this.child = child;
         this.staticPartitions = staticPartitions;
         this.overwrite = overwrite;
         this.dynamicOptions = dynamicOptions;
+        this.modifyType = modifyType;
     }
 
     public ContextResolvedTable getContextResolvedTable() {
@@ -73,6 +104,14 @@ public class SinkModifyOperation implements ModifyOperation {
         return overwrite;
     }
 
+    public boolean isUpdate() {
+        return modifyType == ModifyType.UPDATE;
+    }
+
+    public boolean isDelete() {
+        return modifyType == ModifyType.DELETE;
+    }
+
     public Map<String, String> getDynamicOptions() {
         return dynamicOptions;
     }
@@ -91,6 +130,7 @@ public class SinkModifyOperation implements ModifyOperation {
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
         params.put("identifier", getContextResolvedTable().getIdentifier().asSummaryString());
+        params.put("modifyType", modifyType);
         params.put("staticPartitions", staticPartitions);
         params.put("overwrite", overwrite);
         params.put("dynamicOptions", dynamicOptions);
@@ -101,4 +141,13 @@ public class SinkModifyOperation implements ModifyOperation {
                 Collections.singletonList(child),
                 Operation::asSummaryString);
     }
+
+    /** The type of sink modification. */
+    public enum ModifyType {
+        INSERT,
+
+        UPDATE,
+
+        DELETE
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
new file mode 100644
index 00000000000..c9682ed5078
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.resolver.ExpressionResolver;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TableFactoryUtil;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterConditionRule;
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+
+import org.apache.calcite.plan.RelOptPredicateList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.Tuple2;
+
+/** A utility class for delete push down. */
+public class DeletePushDownUtils {
+
+    /**
+     * Get the {@link DynamicTableSink} for the table to be modified. Return Optional.empty() if it
+     * can't get the {@link DynamicTableSink}.
+     */
+    public static Optional<DynamicTableSink> getDynamicTableSink(
+            ContextResolvedTable contextResolvedTable,
+            LogicalTableModify tableModify,
+            CatalogManager catalogManager) {
+        final FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster());
+
+        CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable();
+        // only consider DynamicTableSink
+        if (catalogBaseTable instanceof CatalogTable) {
+            ResolvedCatalogTable resolvedTable = contextResolvedTable.getResolvedTable();
+            Optional<Catalog> optionalCatalog = contextResolvedTable.getCatalog();
+            ObjectIdentifier objectIdentifier = contextResolvedTable.getIdentifier();
+            boolean isTemporary = contextResolvedTable.isTemporary();
+            // only consider the CatalogTable that doesn't use legacy connector sink option
+            if (!contextResolvedTable.isAnonymous()
+                    && !TableFactoryUtil.isLegacyConnectorOptions(
+                            catalogManager
+                                    .getCatalog(objectIdentifier.getCatalogName())
+                                    .orElse(null),
+                            context.getTableConfig(),
+                            !context.isBatchMode(),
+                            objectIdentifier,
+                            resolvedTable,
+                            isTemporary)) {
+                DynamicTableSinkFactory dynamicTableSinkFactory = null;
+                if (optionalCatalog.isPresent()
+                        && optionalCatalog.get().getFactory().isPresent()
+                        && optionalCatalog.get().getFactory().get()
+                                instanceof DynamicTableSinkFactory) {
+                    // try get from catalog
+                    dynamicTableSinkFactory =
+                            (DynamicTableSinkFactory) optionalCatalog.get().getFactory().get();
+                }
+
+                if (dynamicTableSinkFactory == null) {
+                    Optional<DynamicTableSinkFactory> factoryFromModule =
+                            context.getModuleManager().getFactory((Module::getTableSinkFactory));
+                    // then try get from module
+                    dynamicTableSinkFactory = factoryFromModule.orElse(null);
+                }
+                // create table dynamic table sink
+                DynamicTableSink tableSink =
+                        FactoryUtil.createDynamicTableSink(
+                                dynamicTableSinkFactory,
+                                objectIdentifier,
+                                resolvedTable,
+                                Collections.emptyMap(),
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                contextResolvedTable.isTemporary());
+                return Optional.of(tableSink);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Get the resolved filter expressions from the {@code WHERE} clause in DELETE statement, return
+     * Optional.empty() if {@code WHERE} clause contains sub-query.
+     */
+    public static Optional<List<ResolvedExpression>> getResolvedFilterExpressions(
+            LogicalTableModify tableModify) {
+        FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster());
+        RelNode input = tableModify.getInput().getInput(0);
+        // no WHERE clause, return an empty list
+        if (input instanceof LogicalTableScan) {
+            return Optional.of(Collections.emptyList());
+        }
+        if (!(input instanceof LogicalFilter)) {
+            return Optional.empty();
+        }
+
+        Filter filter = (Filter) input;
+        if (RexUtil.SubQueryFinder.containsSubQuery(filter)) {
+            return Optional.empty();
+        }
+
+        // optimize the filter
+        filter = prepareFilter(filter);
+
+        // resolve the filter to get resolved expression
+        List<ResolvedExpression> resolveExpression = resolveFilter(context, filter);
+        return Optional.ofNullable(resolveExpression);
+    }
+
+    /** Prepare the filter with reducing && simplifying. */
+    private static Filter prepareFilter(Filter filter) {
+        // we try to reduce and simplify the filter
+        ReduceExpressionsRuleProxy reduceExpressionsRuleProxy = ReduceExpressionsRuleProxy.INSTANCE;
+        SimplifyFilterConditionRule simplifyFilterConditionRule =
+                SimplifyFilterConditionRule.INSTANCE();
+        // max iteration num for reducing and simplifying filter,
+        // we use 5 as the max iteration num which is same with the iteration num in Flink's plan
+        // optimizing.
+        int maxIteration = 5;
+
+        boolean changed = true;
+        int iteration = 1;
+        // iterate until it reaches max iteration num or there's no changes in one iterate
+        while (changed && iteration <= maxIteration) {
+            changed = false;
+            // first apply the rule to reduce condition in filter
+            RexNode newCondition = filter.getCondition();
+            List<RexNode> expList = new ArrayList<>();
+            expList.add(newCondition);
+            if (reduceExpressionsRuleProxy.reduce(filter, expList)) {
+                // get the new condition
+                newCondition = expList.get(0);
+                changed = true;
+            }
+            // create a new filter
+            filter = filter.copy(filter.getTraitSet(), filter.getInput(), newCondition);
+            // then apply the rule to simplify filter
+            Option<Filter> changedFilter =
+                    simplifyFilterConditionRule.simplify(filter, new boolean[] {false});
+            if (changedFilter.isDefined()) {
+                filter = changedFilter.get();
+                changed = true;
+            }
+            iteration += 1;
+        }
+        return filter;
+    }
+
+    /**
+     * A proxy for {@link ReduceExpressionsRule}, which enables us to call the method {@link
+     * ReduceExpressionsRule#reduceExpressions(RelNode, List, RelOptPredicateList)}.
+     */
+    private static class ReduceExpressionsRuleProxy
+            extends ReduceExpressionsRule<ReduceExpressionsRule.Config> {
+        private static final ReduceExpressionsRule.Config config =
+                FilterReduceExpressionsRule.FilterReduceExpressionsRuleConfig.DEFAULT;
+        private static final ReduceExpressionsRuleProxy INSTANCE = new ReduceExpressionsRuleProxy();
+
+        public ReduceExpressionsRuleProxy() {
+            super(config);
+        }
+
+        @Override
+        public void onMatch(RelOptRuleCall relOptRuleCall) {
+            throw new UnsupportedOperationException("This shouldn't be called");
+        }
+
+        private boolean reduce(RelNode rel, List<RexNode> expList) {
+            return reduceExpressions(
+                    rel,
+                    expList,
+                    RelOptPredicateList.EMPTY,
+                    true,
+                    config.matchNullability(),
+                    config.treatDynamicCallsAsConstant());
+        }
+    }
+
+    /** Return the ResolvedExpression according to Filter. */
+    private static List<ResolvedExpression> resolveFilter(FlinkContext context, Filter filter) {
+        Tuple2<RexNode[], RexNode[]> extractedPredicates =
+                FlinkRexUtil.extractPredicates(
+                        filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+                        filter.getCondition(),
+                        filter,
+                        filter.getCluster().getRexBuilder());
+        RexNode[] convertiblePredicates = extractedPredicates._1;
+        RexNode[] unconvertedPredicates = extractedPredicates._2;
+        if (unconvertedPredicates.length != 0) {
+            // if contain any unconverted condition, return null
+            return null;
+        }
+        RexNodeToExpressionConverter converter =
+                new RexNodeToExpressionConverter(
+                        filter.getCluster().getRexBuilder(),
+                        filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
+                        context.getFunctionCatalog(),
+                        context.getCatalogManager(),
+                        TimeZone.getTimeZone(
+                                TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+        List<Expression> filters =
+                Arrays.stream(convertiblePredicates)
+                        .map(
+                                p -> {
+                                    Option<ResolvedExpression> expr = p.accept(converter);
+                                    if (expr.isDefined()) {
+                                        return expr.get();
+                                    } else {
+                                        throw new TableException(
+                                                String.format(
+                                                        "%s can not be converted to Expression",
+                                                        p));
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        ExpressionResolver resolver =
+                ExpressionResolver.resolverFor(
+                                context.getTableConfig(),
+                                context.getClassLoader(),
+                                name -> Optional.empty(),
+                                context.getFunctionCatalog()
+                                        .asLookup(
+                                                str -> {
+                                                    throw new TableException(
+                                                            "We should not need to lookup any expressions at this point");
+                                                }),
+                                context.getCatalogManager().getDataTypeFactory(),
+                                (sqlExpression, inputRowType, outputType) -> {
+                                    throw new TableException(
+                                            "SQL expression parsing is not supported at this location.");
+                                })
+                        .build();
+        return resolver.resolve(filters);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 96d279517fd..1851cce8c11 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -118,15 +118,19 @@ import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
@@ -199,6 +203,8 @@ import org.apache.flink.util.StringUtils;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.hint.HintStrategyTable;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.sql.SqlDelete;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
@@ -392,6 +398,8 @@ public class SqlToOperationConverter {
             return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated));
         } else if (validated instanceof SqlStopJob) {
             return Optional.of(converter.convertStopJob((SqlStopJob) validated));
+        } else if (validated instanceof SqlDelete) {
+            return Optional.of(converter.convertDelete((SqlDelete) validated));
         } else {
             return Optional.empty();
         }
@@ -1492,6 +1500,42 @@ public class SqlToOperationConverter {
                 sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain());
     }
 
+    private Operation convertDelete(SqlDelete sqlDelete) {
+        RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
+        LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel;
+        UnresolvedIdentifier unresolvedTableIdentifier =
+                UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName());
+        ContextResolvedTable contextResolvedTable =
+                catalogManager.getTableOrError(
+                        catalogManager.qualifyIdentifier(unresolvedTableIdentifier));
+        // try push down delete
+        Optional<DynamicTableSink> optionalDynamicTableSink =
+                DeletePushDownUtils.getDynamicTableSink(
+                        contextResolvedTable, tableModify, catalogManager);
+        if (optionalDynamicTableSink.isPresent()) {
+            DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get();
+            // if the table sink supports delete push down
+            if (dynamicTableSink instanceof SupportsDeletePushDown) {
+                SupportsDeletePushDown supportsDeletePushDownSink =
+                        (SupportsDeletePushDown) dynamicTableSink;
+                // get resolved filter expression
+                Optional<List<ResolvedExpression>> filters =
+                        DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+                if (filters.isPresent()
+                        && supportsDeletePushDownSink.applyDeleteFilters(filters.get())) {
+                    return new DeleteFromFilterOperation(
+                            contextResolvedTable, supportsDeletePushDownSink, filters.get());
+                }
+            }
+        }
+
+        // otherwise, delete push down is not applicable, throw unsupported exception
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.",
+                        unresolvedTableIdentifier.asSummaryString()));
+    }
+
     private void validateTableConstraint(SqlTableConstraint constraint) {
         if (constraint.isUnique()) {
             throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
index 33d24fa460f..4dc970af831 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
@@ -85,7 +85,7 @@ public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourc
 
         RelBuilder relBuilder = call.builder();
         Tuple2<RexNode[], RexNode[]> extractedPredicates =
-                extractPredicates(
+                FlinkRexUtil.extractPredicates(
                         originProgram.getInputRowType().getFieldNames().toArray(new String[0]),
                         originProgram.expandLocalRef(originProgram.getCondition()),
                         scan,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
index 3e023d50afc..eb5e4f6d25d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
@@ -22,29 +22,22 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
 import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
-import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
-import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -120,23 +113,6 @@ public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule {
         return new Tuple2<>(result, newTableSourceTable);
     }
 
-    protected Tuple2<RexNode[], RexNode[]> extractPredicates(
-            String[] inputNames, RexNode filterExpression, TableScan scan, RexBuilder rexBuilder) {
-        FlinkContext context = ShortcutUtils.unwrapContext(scan);
-        int maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(scan);
-        RexNodeToExpressionConverter converter =
-                new RexNodeToExpressionConverter(
-                        rexBuilder,
-                        inputNames,
-                        context.getFunctionCatalog(),
-                        context.getCatalogManager(),
-                        TimeZone.getTimeZone(
-                                TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
-
-        return RexNodeExtractor.extractConjunctiveConditions(
-                filterExpression, maxCnfNodeCount, rexBuilder, converter);
-    }
-
     /**
      * Determines wether we can pushdown the filter into the source. we can not push filter twice,
      * make sure FilterPushDownSpec has not been assigned as a capability.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 516f00c15cd..eaa6999cdd7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -78,7 +78,7 @@ public class PushFilterIntoTableSourceScanRule extends PushFilterIntoSourceScanR
 
         RelBuilder relBuilder = call.builder();
         Tuple2<RexNode[], RexNode[]> extractedPredicates =
-                extractPredicates(
+                FlinkRexUtil.extractPredicates(
                         filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
                         filter.getCondition(),
                         scan,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 81819208b9d..dba27b0eb2e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -23,12 +23,14 @@ import org.apache.flink.configuration.ConfigOptions.key
 import org.apache.flink.table.planner.functions.sql.SqlTryCastFunction
 import org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail
 import org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
 
 import com.google.common.base.Function
 import com.google.common.collect.{ImmutableList, Lists}
 import org.apache.calcite.avatica.util.ByteString
 import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator}
@@ -39,7 +41,7 @@ import org.apache.calcite.util._
 import java.lang.{Iterable => JIterable}
 import java.math.BigDecimal
 import java.util
-import java.util.Optional
+import java.util.{Optional, TimeZone}
 import java.util.function.Predicate
 
 import scala.collection.JavaConversions._
@@ -634,6 +636,31 @@ object FlinkRexUtil {
     val projects = rexProgram.getProjectList.map(rexProgram.expandLocalRef)
     projects.forall(isDeterministicInStreaming)
   }
+
+  /**
+   * Return convertible rex nodes and unconverted rex nodes extracted from the filter expression.
+   */
+  def extractPredicates(
+      inputNames: Array[String],
+      filterExpression: RexNode,
+      rel: RelNode,
+      rexBuilder: RexBuilder): (Array[RexNode], Array[RexNode]) = {
+    val context = ShortcutUtils.unwrapContext(rel)
+    val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel);
+    val converter =
+      new RexNodeToExpressionConverter(
+        rexBuilder,
+        inputNames,
+        context.getFunctionCatalog,
+        context.getCatalogManager,
+        TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)));
+
+    RexNodeExtractor.extractConjunctiveConditions(
+      filterExpression,
+      maxCnfNodeCount,
+      rexBuilder,
+      converter);
+  }
 }
 
 /**
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
new file mode 100644
index 00000000000..df114031ecf
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** A factory to create table to support update/delete for test purpose. */
+public class TestUpdateDeleteTableFactory
+        implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "test-update-delete";
+
+    private static final ConfigOption<String> DATA_ID =
+            ConfigOptions.key("data-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The data id used to read the rows.");
+
+    private static final ConfigOption<Boolean> ONLY_ACCEPT_EQUAL_PREDICATE =
+            ConfigOptions.key("only-accept-equal-predicate")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether only accept when the all predicates in filter is equal expression for delete statement.");
+
+    private static final ConfigOption<Boolean> SUPPORT_DELETE_PUSH_DOWN =
+            ConfigOptions.key("support-delete-push-down")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether the table supports delete push down.");
+
+    private static final AtomicInteger idCounter = new AtomicInteger(0);
+    private static final Map<String, Collection<RowData>> registeredRowData = new HashMap<>();
+
+    public static String registerRowData(Collection<RowData> data) {
+        String id = String.valueOf(idCounter.incrementAndGet());
+        registeredRowData.put(id, data);
+        return id;
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+        String dataId =
+                helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
+        if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) {
+            return new SupportsDeletePushDownSink(
+                    dataId,
+                    helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE),
+                    context.getCatalogTable());
+        } else {
+            return new TestSink();
+        }
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+
+        String dataId =
+                helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
+        return new TestTableSource(dataId);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.emptySet();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>(
+                Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, SUPPORT_DELETE_PUSH_DOWN));
+    }
+
+    /** A test table source which supports reading metadata. */
+    private static class TestTableSource implements ScanTableSource {
+        private final String dataId;
+
+        public TestTableSource(String dataId) {
+            this.dataId = dataId;
+        }
+
+        @Override
+        public DynamicTableSource copy() {
+            return new TestTableSource(dataId);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "test table source";
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+            return new SourceFunctionProvider() {
+                @Override
+                public SourceFunction<RowData> createSourceFunction() {
+                    Collection<RowData> rows = registeredRowData.get(dataId);
+                    if (rows != null) {
+                        return new FromElementsFunction<>(rows);
+                    } else {
+                        return new FromElementsFunction<>();
+                    }
+                }
+
+                @Override
+                public boolean isBounded() {
+                    return true;
+                }
+            };
+        }
+    }
+
+    /** A common test sink. */
+    private static class TestSink implements DynamicTableSink {
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return null;
+        }
+
+        @Override
+        public String asSummaryString() {
+            return null;
+        }
+    }
+
+    /** A sink that supports delete push down. */
+    public static class SupportsDeletePushDownSink extends TestSink
+            implements SupportsDeletePushDown {
+
+        private final String dataId;
+        private final boolean onlyAcceptEqualPredicate;
+        private final ResolvedCatalogTable resolvedCatalogTable;
+        private final RowData.FieldGetter[] fieldGetters;
+        private final List<String> columns;
+
+        private List<Tuple2<String, Object>> equalPredicates;
+
+        public SupportsDeletePushDownSink(
+                String dataId,
+                boolean onlyAcceptEqualPredicate,
+                ResolvedCatalogTable resolvedCatalogTable) {
+            this.dataId = dataId;
+            this.onlyAcceptEqualPredicate = onlyAcceptEqualPredicate;
+            this.resolvedCatalogTable = resolvedCatalogTable;
+            this.fieldGetters = getAllFieldGetter(resolvedCatalogTable.getResolvedSchema());
+            this.columns = resolvedCatalogTable.getResolvedSchema().getColumnNames();
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new SupportsDeletePushDownSink(
+                    dataId, onlyAcceptEqualPredicate, resolvedCatalogTable);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "SupportDeletePushDownSink";
+        }
+
+        @Override
+        public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
+            if (onlyAcceptEqualPredicate) {
+                Optional<List<Tuple2<String, Object>>> optionalEqualPredicates =
+                        getEqualPredicates(filters);
+                if (optionalEqualPredicates.isPresent()) {
+                    equalPredicates = optionalEqualPredicates.get();
+                    return true;
+                }
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public Optional<Long> executeDeletion() {
+            if (onlyAcceptEqualPredicate) {
+                Collection<RowData> existingRows = registeredRowData.get(dataId);
+                long rowsBefore = existingRows.size();
+                existingRows.removeIf(
+                        rowData ->
+                                satisfyEqualPredicate(
+                                        equalPredicates, rowData, fieldGetters, columns));
+                return Optional.of(rowsBefore - existingRows.size());
+            }
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Get a list of equal predicate from a list of filter, each contains [column, value]. Return
+     * Optional.empty() if it contains any non-equal predicate.
+     */
+    private static Optional<List<Tuple2<String, Object>>> getEqualPredicates(
+            List<ResolvedExpression> filters) {
+        List<Tuple2<String, Object>> equalPredicates = new ArrayList<>();
+        for (ResolvedExpression expression : filters) {
+            if (!(expression instanceof CallExpression)) {
+                return Optional.empty();
+            }
+            CallExpression callExpression = (CallExpression) expression;
+            if (callExpression.getFunctionDefinition() != BuiltInFunctionDefinitions.EQUALS) {
+                return Optional.empty();
+            }
+            String colName = getColumnName(callExpression);
+            Object value = getColumnValue(callExpression);
+            equalPredicates.add(Tuple2.of(colName, value));
+        }
+        return Optional.of(equalPredicates);
+    }
+
+    private static String getColumnName(CallExpression comp) {
+        return ((FieldReferenceExpression) comp.getChildren().get(0)).getName();
+    }
+
+    private static Object getColumnValue(CallExpression comp) {
+        ValueLiteralExpression valueLiteralExpression =
+                (ValueLiteralExpression) comp.getChildren().get(1);
+        return valueLiteralExpression
+                .getValueAs(valueLiteralExpression.getOutputDataType().getConversionClass())
+                .get();
+    }
+
+    /** Check the rowData satisfies the equal predicate. */
+    private static boolean satisfyEqualPredicate(
+            List<Tuple2<String, Object>> equalPredicates,
+            RowData rowData,
+            RowData.FieldGetter[] fieldGetters,
+            List<String> columns) {
+        for (Tuple2<String, Object> equalPredicate : equalPredicates) {
+            String colName = equalPredicate.f0;
+            Object value = equalPredicate.f1;
+            int colIndex = columns.indexOf(colName);
+            if (!(Objects.equals(value, fieldGetters[colIndex].getFieldOrNull(rowData)))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema resolvedSchema) {
+        List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
+        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[dataTypes.size()];
+        for (int i = 0; i < dataTypes.size(); i++) {
+            fieldGetters[i] = createFieldGetter(dataTypes.get(i).getLogicalType(), i);
+        }
+        return fieldGetters;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
new file mode 100644
index 00000000000..f134223d243
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
+import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.parse.CalciteParser;
+import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.sql.SqlNode;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletePushDownUtils}. */
+public class DeletePushDownUtilsTest {
+    private final TableConfig tableConfig = TableConfig.getDefault();
+    private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default");
+    private final CatalogManager catalogManager =
+            CatalogManagerMocks.preparedCatalogManager()
+                    .defaultCatalog("builtin", catalog)
+                    .config(
+                            Configuration.fromMap(
+                                    Collections.singletonMap(
+                                            ExecutionOptions.RUNTIME_MODE.key(),
+                                            RuntimeExecutionMode.BATCH.name())))
+                    .build();
+
+    private final PlannerMocks plannerMocks =
+            PlannerMocks.newBuilder()
+                    .withBatchMode(true)
+                    .withTableConfig(tableConfig)
+                    .withCatalogManager(catalogManager)
+                    .withRootSchema(
+                            asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false)))
+                    .build();
+    private final PlannerContext plannerContext = plannerMocks.getPlannerContext();
+    private final CalciteParser parser = plannerContext.createCalciteParser();
+    private final FlinkPlannerImpl flinkPlanner = plannerContext.createFlinkPlanner();
+
+    @Test
+    public void testGetDynamicTableSink() {
+        // create a table with connector = test-update-delete
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "test-update-delete");
+        CatalogTable catalogTable = createTestCatalogTable(options);
+        ObjectIdentifier tableId = ObjectIdentifier.of("builtin", "default", "t");
+        catalogManager.createTable(catalogTable, tableId, false);
+        ContextResolvedTable resolvedTable =
+                ContextResolvedTable.permanent(
+                        tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
+        LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
+        Optional<DynamicTableSink> optionalDynamicTableSink =
+                DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
+        // verify we can get the dynamic table sink
+        assertThat(optionalDynamicTableSink).isPresent();
+        assertThat(optionalDynamicTableSink.get())
+                .isInstanceOf(TestUpdateDeleteTableFactory.SupportsDeletePushDownSink.class);
+
+        // create table with connector = COLLECTION, it's legacy table sink
+        options.put("connector", "COLLECTION");
+        catalogTable = createTestCatalogTable(options);
+        tableId = ObjectIdentifier.of("builtin", "default", "t1");
+        catalogManager.createTable(catalogTable, tableId, false);
+        resolvedTable =
+                ContextResolvedTable.permanent(
+                        tableId, catalog, catalogManager.resolveCatalogTable(catalogTable));
+        tableModify = getTableModifyFromSql("DELETE FROM t1");
+        optionalDynamicTableSink =
+                DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager);
+        // verify it should be empty since it's not an instance of DynamicTableSink but is legacy
+        // TableSink
+        assertThat(optionalDynamicTableSink).isEmpty();
+    }
+
+    @Test
+    public void testGetResolveFilterExpressions() {
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("f0", DataTypes.INT().notNull())
+                                .column("f1", DataTypes.STRING().nullable())
+                                .column("f2", DataTypes.BIGINT().nullable())
+                                .build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+        catalogManager.createTable(
+                catalogTable, ObjectIdentifier.of("builtin", "default", "t"), false);
+
+        // verify there's no where clause
+        LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t");
+        Optional<List<ResolvedExpression>> optionalResolvedExpressions =
+                DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        verifyExpression(optionalResolvedExpressions, "[]");
+
+        tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 and f1 = '123'");
+        optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        verifyExpression(optionalResolvedExpressions, "[equals(f0, 1), equals(f1, '123')]");
+
+        tableModify = getTableModifyFromSql("DELETE FROM t where f0 = 1 + 6 and f0 < 6");
+        optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        assertThat(optionalResolvedExpressions).isPresent();
+        verifyExpression(optionalResolvedExpressions, "[false]");
+
+        tableModify = getTableModifyFromSql("DELETE FROM t where f0 = f2 + 1");
+        optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        verifyExpression(
+                optionalResolvedExpressions, "[equals(cast(f0, BIGINT NOT NULL), plus(f2, 1))]");
+
+        // resolve filters is not available as it contains sub-query
+        tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select count(1) from t)");
+        optionalResolvedExpressions = DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        assertThat(optionalResolvedExpressions).isEmpty();
+    }
+
+    private CatalogTable createTestCatalogTable(Map<String, String> options) {
+        return CatalogTable.of(
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT().notNull())
+                        .column("f1", DataTypes.STRING().nullable())
+                        .column("f2", DataTypes.BIGINT().nullable())
+                        .build(),
+                null,
+                Collections.emptyList(),
+                options);
+    }
+
+    private LogicalTableModify getTableModifyFromSql(String sql) {
+        SqlNode sqlNode = parser.parse(sql);
+        final SqlNode validated = flinkPlanner.validate(sqlNode);
+        RelRoot deleteRelational = flinkPlanner.rel(validated);
+        return (LogicalTableModify) deleteRelational.rel;
+    }
+
+    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+    private void verifyExpression(
+            Optional<List<ResolvedExpression>> optionalResolvedExpressions, String expected) {
+        assertThat(optionalResolvedExpressions).isPresent();
+        assertThat(optionalResolvedExpressions.get().toString()).isEqualTo(expected);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 0197ce8d432..129b969b9a0 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -52,9 +52,11 @@ import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.factories.TestManagedTableFactory;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.DeleteFromFilterOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -96,6 +98,7 @@ import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.expressions.utils.Func0$;
 import org.apache.flink.table.planner.expressions.utils.Func1$;
 import org.apache.flink.table.planner.expressions.utils.Func8$;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
 import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.parse.ExtendedParser;
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
@@ -2706,6 +2709,46 @@ public class SqlToOperationConverterTest {
         assertThat(extendedParser.parse(command)).get().isInstanceOf(QuitOperation.class);
     }
 
+    @Test
+    public void testDeletePushDown() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT().notNull())
+                                .column("c", DataTypes.STRING().notNull())
+                                .build(),
+                        null,
+                        Collections.emptyList(),
+                        options);
+        ObjectIdentifier tableIdentifier =
+                ObjectIdentifier.of("builtin", "default", "test_push_down");
+        catalogManager.createTable(catalogTable, tableIdentifier, false);
+
+        // no filter in delete statement
+        Operation operation = parse("DELETE FROM test_push_down");
+        checkDeleteFromFilterOperation(operation, "[]");
+
+        // with filters in delete statement
+        operation = parse("DELETE FROM test_push_down where a = 1 and c = '123'");
+        checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, '123')]");
+
+        // with filter = false after reduced in delete statement
+        operation = parse("DELETE FROM test_push_down where a = 1 + 6 and a = 2");
+        checkDeleteFromFilterOperation(operation, "[false]");
+
+        assertThatThrownBy(
+                        () ->
+                                parse(
+                                        "DELETE FROM test_push_down where a = (select count(*) from test_push_down)"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        String.format(
+                                "Only delete push down is supported currently, but the delete statement can't pushed to table sink %s.",
+                                tableIdentifier.asSerializableString()));
+    }
+
     // ~ Tool Methods ----------------------------------------------------------
 
     private static TestItem createTestItem(Object... args) {
@@ -2902,6 +2945,15 @@ public class SqlToOperationConverterTest {
                         TestManagedTableFactory.ENRICHED_VALUE);
     }
 
+    private static void checkDeleteFromFilterOperation(
+            Operation operation, String expectedFilters) {
+        assertThat(operation).isInstanceOf(DeleteFromFilterOperation.class);
+        DeleteFromFilterOperation deleteFromFiltersOperation =
+                (DeleteFromFilterOperation) operation;
+        List<ResolvedExpression> filters = deleteFromFiltersOperation.getFilters();
+        assertThat(filters.toString()).isEqualTo(expectedFilters);
+    }
+
     // ~ Inner Classes ----------------------------------------------------------
 
     private static class TestItem {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
new file mode 100644
index 00000000000..8a60d24dfbe
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for DELETE statement in batch mode. */
+public class DeleteTableITCase extends BatchTestBase {
+    private static final int ROW_NUM = 5;
+
+    @Test
+    public void testDeletePushDown() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) WITH"
+                                        + " ('connector' = 'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'only-accept-equal-predicate' = 'true'"
+                                        + ")",
+                                dataId));
+        // it only contains equal expression, should be pushed down
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("DELETE FROM t WHERE a = 1").collect());
+        assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]");
+        rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM t").collect());
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 6.0], +I[4, b_4, 8.0]]");
+
+        // should throw exception for the deletion can not be pushed down as it contains non-equal
+        // predicate and the table sink haven't implemented SupportsRowLevelDeleteInterface
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a > 1"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Only delete push down is supported currently, "
+                                + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t`.");
+
+        tEnv().executeSql(
+                        "CREATE TABLE t1 (a int) WITH"
+                                + " ('connector' = 'test-update-delete',"
+                                + " 'support-delete-push-down' = 'false')");
+        // should throw exception for sink that doesn't implement SupportsDeletePushDown interface
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "Only delete push down is supported currently, "
+                                + "but the delete statement can't pushed to table sink `default_catalog`.`default_database`.`t1`.");
+    }
+
+    private String registerData() {
+        List<RowData> values = createValue();
+        return TestUpdateDeleteTableFactory.registerRowData(values);
+    }
+
+    private List<RowData> createValue() {
+        List<RowData> values = new ArrayList<>();
+        for (int i = 0; i < ROW_NUM; i++) {
+            values.add(GenericRowData.of(i, StringData.fromString("b_" + i), i * 2.0));
+        }
+        return values;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
new file mode 100644
index 00000000000..e6ba79ef2b4
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The IT case for DELETE statement in streaming mode. */
+public class DeleteTableITCase extends StreamingTestBase {
+
+    @Test
+    public void testDelete() {
+        tEnv().executeSql("CREATE TABLE t (a int) WITH ('connector' = 'test-update-delete')");
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t"))
+                .isInstanceOf(TableException.class)
+                .hasMessage("DELETE statement is not supported for streaming mode now.");
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7386413a5d2..bfcf0178f2c 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -18,3 +18,4 @@ org.apache.flink.table.planner.factories.TestValuesTableFactory
 org.apache.flink.table.planner.factories.TestFileFactory
 org.apache.flink.table.planner.factories.TableFactoryHarness$Factory
 org.apache.flink.table.planner.plan.stream.sql.TestTableFactory
+org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory