You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2023/01/17 11:17:15 UTC

[flink-table-store] branch master updated: [FLINK-30711] Support dropping partition via Flink action

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new f503591b [FLINK-30711] Support dropping partition via Flink action
f503591b is described below

commit f503591b7f5ee77f876296110840c87d7672bb9b
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jan 17 19:17:09 2023 +0800

    [FLINK-30711] Support dropping partition via Flink action
    
    This closes #487
---
 docs/content/docs/how-to/writing-tables.md         |  54 ++++--
 flink-table-store-connector/pom.xml                |   8 +
 .../flink/table/store/connector/action/Action.java | 131 +++++++++++++
 .../store/connector/action/CompactAction.java      |  74 +++-----
 .../connector/action/DropPartitionAction.java      | 115 ++++++++++++
 .../table/store/connector/action/FlinkActions.java |  32 +---
 .../store/connector/action/ActionITCaseBase.java   | 111 +++++++++++
 .../connector/action/CompactActionITCase.java      | 202 +++++++--------------
 .../connector/action/DropPartitionITCase.java      | 176 ++++++++++++++++++
 .../store/connector/util/AbstractTestBase.java     |   3 +-
 .../store/file/operation/FileStoreCommit.java      |  21 ++-
 .../store/file/operation/FileStoreCommitImpl.java  |  42 +++--
 .../flink/table/store/table/sink/TableCommit.java  |  16 +-
 .../table/store/tests/FlinkActionsE2eTest.java     |  89 +++++++--
 14 files changed, 817 insertions(+), 257 deletions(-)

diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md
index db0e4661..135d7443 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -157,29 +157,23 @@ INSERT OVERWRITE MyTable SELECT * FROM MyTable WHERE false
 
 {{< /tabs >}}
 
-## Purging a Partition
+## Purging Partitions
 
-Particularly, you can use `INSERT OVERWRITE` to purge data of a partition by inserting empty value to the partition:
+Currently, Table Store supports two ways to purge partitions.
 
-{{< tabs "purge-partition-syntax" >}}
+1. Like purging tables, you can use `INSERT OVERWRITE` to purge data of partitions by inserting empty value to them.
 
-{{< tab "Flink" >}}
-
-```sql
-INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
-```
+2. Method #1 dose not support to drop multiple partitions. In case that you need to drop multiple partitions, you can submit the drop-partition job through `flink run`.
 
-{{< /tab >}}
-
-{{< /tabs >}}
-
-The following SQL is an example:
-
-{{< tabs "purge-partition-example" >}}
+{{< tabs "purge-partitions-syntax" >}}
 
 {{< tab "Flink" >}}
 
 ```sql
+-- Syntax
+INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable WHERE false
+
+-- The following SQL is an example:
 -- table definition
 CREATE TABLE MyTable (
     k0 INT,
@@ -196,4 +190,34 @@ INSERT OVERWRITE MyTable PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE
 
 {{< /tab >}}
 
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a drop-partition job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-dist-{{< version >}}.jar \
+    drop-partition \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table <table-name>
+    --partition <partition_spec>
+    [--partition <partition_spec> ...]
+
+partition_spec:
+key1=value1,key2=value2...
+```
+
+For more information of drop-partition, see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-dist-{{< version >}}.jar \
+    drop-partition --help
+```
+
+{{< /tab >}}
+
 {{< /tabs >}}
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index 5a8a2c2a..ac13d88d 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -83,6 +83,14 @@ under the License.
 
         <!-- test dependencies -->
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-format</artifactId>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java
new file mode 100644
index 00000000..59a66e97
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/Action.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Abstract class for Flink actions. */
+public interface Action {
+
+    /** The execution method of the action. */
+    void run() throws Exception;
+
+    @Nullable
+    static Path getTablePath(MultipleParameterTool params) {
+        String warehouse = params.get("warehouse");
+        String database = params.get("database");
+        String table = params.get("table");
+        String path = params.get("path");
+
+        Path tablePath = null;
+        int count = 0;
+        if (warehouse != null || database != null || table != null) {
+            if (warehouse == null || database == null || table == null) {
+                System.err.println(
+                        "Warehouse, database and table must be specified all at once.\n"
+                                + "Run <action> --help for help.");
+                return null;
+            }
+            tablePath = new Path(new Path(warehouse, database + ".db"), table);
+            count++;
+        }
+        if (path != null) {
+            tablePath = new Path(path);
+            count++;
+        }
+
+        if (count != 1) {
+            System.err.println(
+                    "Please specify either \"warehouse, database and table\" or \"path\".\n"
+                            + "Run <action> --help for help.");
+            return null;
+        }
+
+        return tablePath;
+    }
+
+    @Nullable
+    static List<Map<String, String>> getPartitions(MultipleParameterTool params) {
+        List<Map<String, String>> partitions = new ArrayList<>();
+        for (String partition : params.getMultiParameter("partition")) {
+            Map<String, String> kvs = new HashMap<>();
+            for (String kvString : partition.split(",")) {
+                String[] kv = kvString.split("=");
+                if (kv.length != 2) {
+                    System.err.print(
+                            "Invalid key-value pair \""
+                                    + kvString
+                                    + "\".\n"
+                                    + "Run <action> --help for help.");
+                    return null;
+                }
+                kvs.put(kv[0], kv[1]);
+            }
+            partitions.add(kvs);
+        }
+
+        return partitions;
+    }
+
+    /** Factory to create {@link Action}. */
+    class Factory {
+
+        // supported actions
+        private static final String COMPACT = "compact";
+        private static final String DROP_PARTITION = "drop-partition";
+
+        public static Optional<Action> create(String[] args) {
+            String action = args[0].toLowerCase();
+            String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
+
+            switch (action) {
+                case COMPACT:
+                    return CompactAction.create(actionArgs);
+                case DROP_PARTITION:
+                    return DropPartitionAction.create(actionArgs);
+                default:
+                    System.err.println("Unknown action \"" + action + "\"");
+                    printHelp();
+                    return Optional.empty();
+            }
+        }
+
+        static void printHelp() {
+            System.out.println("Usage: <action> [OPTIONS]");
+            System.out.println();
+
+            System.out.println("Available actions:");
+            System.out.println("  " + COMPACT);
+            System.out.println("  " + DROP_PARTITION);
+            System.out.println();
+
+            System.out.println("For detailed options of each action, run <action> --help");
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
index 947056ad..a87f7699 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/CompactAction.java
@@ -34,14 +34,20 @@ import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUt
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.table.store.connector.action.Action.getPartitions;
+import static org.apache.flink.table.store.connector.action.Action.getTablePath;
+
 /** Table compact action for Flink. */
-public class CompactAction {
+public class CompactAction implements Action {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompactAction.class);
 
     private final CompactorSourceBuilder sourceBuilder;
     private final CompactorSinkBuilder sinkBuilder;
@@ -79,68 +85,37 @@ public class CompactAction {
     //  Flink run methods
     // ------------------------------------------------------------------------
 
-    public static Optional<CompactAction> create(MultipleParameterTool params) {
+    public static Optional<Action> create(String[] args) {
+        LOG.info("Compact job args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
         if (params.has("help")) {
             printHelp();
             return Optional.empty();
         }
 
-        String warehouse = params.get("warehouse");
-        String database = params.get("database");
-        String table = params.get("table");
-        String path = params.get("path");
-
-        Path tablePath = null;
-        int count = 0;
-        if (warehouse != null || database != null || table != null) {
-            if (warehouse == null || database == null || table == null) {
-                System.err.println(
-                        "Warehouse, database and table must be specified all at once.\n"
-                                + "Run compact --help for help.");
-                return Optional.empty();
-            }
-            tablePath = new Path(new Path(warehouse, database + ".db"), table);
-            count++;
-        }
-        if (path != null) {
-            tablePath = new Path(path);
-            count++;
-        }
+        Path tablePath = getTablePath(params);
 
-        if (count != 1) {
-            System.err.println(
-                    "Please specify either \"warehouse, database and table\" or \"path\".\n"
-                            + "Run compact --help for help.");
+        if (tablePath == null) {
             return Optional.empty();
         }
 
         CompactAction action = new CompactAction(tablePath);
 
         if (params.has("partition")) {
-            List<Map<String, String>> partitions = new ArrayList<>();
-            for (String partition : params.getMultiParameter("partition")) {
-                Map<String, String> kvs = new HashMap<>();
-                for (String kvString : partition.split(",")) {
-                    String[] kv = kvString.split("=");
-                    if (kv.length != 2) {
-                        System.err.print(
-                                "Invalid key-value pair \""
-                                        + kvString
-                                        + "\".\n"
-                                        + "Run compact --help for help.");
-                        return Optional.empty();
-                    }
-                    kvs.put(kv[0], kv[1]);
-                }
-                partitions.add(kvs);
+            List<Map<String, String>> partitions = getPartitions(params);
+            if (partitions == null) {
+                return Optional.empty();
             }
+
             action.withPartitions(partitions);
         }
 
         return Optional.of(action);
     }
 
-    public static void printHelp() {
+    private static void printHelp() {
         System.out.println(
                 "Action \"compact\" runs a dedicated job for compacting specified table.");
         System.out.println();
@@ -165,4 +140,11 @@ public class CompactAction {
                 "  compact --warehouse hdfs:///path/to/warehouse --database test_db --table test_table "
                         + "--partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
     }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        build(env);
+        env.execute("Compact job");
+    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java
new file mode 100644
index 00000000..11f58df3
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/DropPartitionAction.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.connector.action.Action.getPartitions;
+import static org.apache.flink.table.store.connector.action.Action.getTablePath;
+
+/** Table drop partition action for Flink. */
+public class DropPartitionAction implements Action {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CompactAction.class);
+
+    private final TableCommit commit;
+
+    DropPartitionAction(Path tablePath, List<Map<String, String>> partitions) {
+        FileStoreTable table = FileStoreTableFactory.create(tablePath);
+        this.commit =
+                table.newCommit(UUID.randomUUID().toString()).withOverwritePartitions(partitions);
+    }
+
+    public static Optional<Action> create(String[] args) {
+        LOG.info("Drop partition job args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        Path tablePath = getTablePath(params);
+
+        if (tablePath == null) {
+            return Optional.empty();
+        }
+
+        if (!params.has("partition")) {
+            LOG.info(
+                    "Action drop-partition must specify partitions needed to be dropped.\n"
+                            + "Run drop-partition --help for help.");
+            System.err.println(
+                    "Action drop-partition must specify partitions needed to be dropped.\n"
+                            + "Run drop-partition --help for help.");
+
+            return Optional.empty();
+        }
+
+        List<Map<String, String>> partitions = getPartitions(params);
+        if (partitions == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(new DropPartitionAction(tablePath, partitions));
+    }
+
+    private static void printHelp() {
+        System.out.println(
+                "Action \"drop-partition\" drops data of specified partitions for a table.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  drop-partition --warehouse <warehouse-path> --database <database-name> "
+                        + "--table <table-name> --partition <partition-name> [--partition <partition-name> ...]");
+        System.out.println(
+                "  drop-partition --path <table-path> --partition <partition-name> [--partition <partition-name> ...]");
+        System.out.println();
+
+        System.out.println("Partition name syntax:");
+        System.out.println("  key1=value1,key2=value2,...");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  drop-partition --warehouse hdfs:///path/to/warehouse --database test_db --table test_table --partition dt=20221126,hh=08");
+        System.out.println(
+                "  drop-partition --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
+    }
+
+    @Override
+    public void run() throws Exception {
+        commit.commit(new ArrayList<>());
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
index ab907784..d41e8c91 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/FlinkActions.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.table.store.connector.action;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import java.util.Arrays;
 import java.util.Optional;
 
+import static org.apache.flink.table.store.connector.action.Action.Factory.printHelp;
+
 /** Table maintenance actions for Flink. */
 public class FlinkActions {
 
@@ -46,35 +45,12 @@ public class FlinkActions {
             System.exit(1);
         }
 
-        String action = args[0].toLowerCase();
-        if ("compact".equals(action)) {
-            runCompact(Arrays.copyOfRange(args, 1, args.length));
-        } else {
-            System.err.println("Unknown action \"" + action + "\"");
-            printHelp();
-            System.exit(1);
-        }
-    }
+        Optional<Action> action = Action.Factory.create(args);
 
-    private static void runCompact(String[] args) throws Exception {
-        Optional<CompactAction> action = CompactAction.create(MultipleParameterTool.fromArgs(args));
         if (action.isPresent()) {
-            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-            action.get().build(env);
-            env.execute("Compact job: " + String.join(" ", args));
+            action.get().run();
         } else {
             System.exit(1);
         }
     }
-
-    private static void printHelp() {
-        System.out.println("Usage: <action> [OPTIONS]");
-        System.out.println();
-
-        System.out.println("Available actions:");
-        System.out.println("  compact");
-        System.out.println();
-
-        System.out.println("For detailed options of each action, run <action> --help");
-    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java
new file mode 100644
index 00000000..1d25e2f8
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/ActionITCaseBase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.connector.util.AbstractTestBase;
+import org.apache.flink.table.store.data.DataFormatTestUtil;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderUtils;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** {@link Action} test base. */
+public class ActionITCaseBase extends AbstractTestBase {
+
+    protected Path tablePath;
+    protected String commitUser;
+
+    protected SnapshotManager snapshotManager;
+    protected TableWrite write;
+    protected TableCommit commit;
+
+    private long incrementalIdentifier;
+
+    @BeforeEach
+    public void before() throws IOException {
+        tablePath = new Path(getTempDirPath());
+        commitUser = UUID.randomUUID().toString();
+        incrementalIdentifier = 0;
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        if (write != null) {
+            write.close();
+        }
+        if (commit != null) {
+            commit.close();
+        }
+    }
+
+    protected FileStoreTable createFileStoreTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            Map<String, String> options)
+            throws Exception {
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema tableSchema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(rowType, partitionKeys, primaryKeys, options, ""));
+        return FileStoreTableFactory.create(tablePath, tableSchema);
+    }
+
+    protected GenericRow rowData(Object... values) {
+        return GenericRow.of(values);
+    }
+
+    protected void writeData(GenericRow... data) throws Exception {
+        for (GenericRow d : data) {
+            write.write(d);
+        }
+        commit.commit(incrementalIdentifier, write.prepareCommit(true, incrementalIdentifier));
+        incrementalIdentifier++;
+    }
+
+    protected List<String> getResult(TableRead read, List<Split> splits, RowType rowType)
+            throws Exception {
+        RecordReader<InternalRow> recordReader = read.createReader(splits);
+        List<String> result = new ArrayList<>();
+        RecordReaderUtils.forEachRemaining(
+                recordReader, row -> result.add(DataFormatTestUtil.rowDataToString(row, rowType)));
+        return result;
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 996eb688..2d1c94a7 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -19,94 +19,69 @@
 package org.apache.flink.table.store.connector.action;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.data.BinaryString;
-import org.apache.flink.table.store.data.GenericRow;
-import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.DataSplit;
 import org.apache.flink.table.store.table.source.DataTableScan;
-import org.apache.flink.table.store.table.source.Split;
-import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
 import org.apache.flink.table.store.table.source.snapshot.SnapshotEnumerator;
 import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DataTypes;
 import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.test.util.AbstractTestBase;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 /** IT cases for {@link CompactAction}. */
-public class CompactActionITCase extends AbstractTestBase {
+public class CompactActionITCase extends ActionITCaseBase {
 
-    private static final RowType ROW_TYPE =
-            RowType.of(
-                    new DataType[] {
-                        DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()
-                    },
-                    new String[] {"k", "v", "hh", "dt"});
-
-    private Path tablePath;
-    private String commitUser;
+    private static final DataType[] FIELD_TYPES =
+            new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()};
 
-    @Before
-    public void before() throws IOException {
-        tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
-        commitUser = UUID.randomUUID().toString();
-    }
+    private static final RowType ROW_TYPE =
+            RowType.of(FIELD_TYPES, new String[] {"k", "v", "hh", "dt"});
 
-    @Test(timeout = 60000)
+    @Test
+    @Timeout(60000)
     public void testBatchCompact() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.WRITE_ONLY.key(), "true");
 
-        FileStoreTable table = createFileStoreTable(options);
-        SnapshotManager snapshotManager = table.snapshotManager();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
-
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
-        commit.commit(0, write.prepareCommit(true, 0));
-
-        write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
-        commit.commit(1, write.prepareCommit(true, 1));
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE,
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        options);
+        snapshotManager = table.snapshotManager();
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(2, 100, 15, BinaryString.fromString("20221208")),
+                rowData(2, 100, 16, BinaryString.fromString("20221208")),
+                rowData(2, 100, 15, BinaryString.fromString("20221209")));
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        Assert.assertEquals(2, snapshot.id());
-        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
-
-        write.close();
-        commit.close();
+        Assertions.assertEquals(2, snapshot.id());
+        Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -115,23 +90,24 @@ public class CompactActionITCase extends AbstractTestBase {
         env.execute();
 
         snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        Assert.assertEquals(3, snapshot.id());
-        Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
+        Assertions.assertEquals(3, snapshot.id());
+        Assertions.assertEquals(Snapshot.CommitKind.COMPACT, snapshot.commitKind());
 
         DataTableScan.DataFilePlan plan = table.newScan().plan();
-        Assert.assertEquals(3, plan.splits().size());
+        Assertions.assertEquals(3, plan.splits().size());
         for (DataSplit split : plan.splits) {
             if (split.partition().getInt(1) == 15) {
                 // compacted
-                Assert.assertEquals(1, split.files().size());
+                Assertions.assertEquals(1, split.files().size());
             } else {
                 // not compacted
-                Assert.assertEquals(2, split.files().size());
+                Assertions.assertEquals(2, split.files().size());
             }
         }
     }
 
-    @Test(timeout = 60000)
+    @Test
+    @Timeout(60000)
     public void testStreamingCompact() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
@@ -142,27 +118,32 @@ public class CompactActionITCase extends AbstractTestBase {
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
         options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
 
-        FileStoreTable table = createFileStoreTable(options);
-        SnapshotManager snapshotManager = table.snapshotManager();
-        TableWrite write = table.newWrite(commitUser);
-        TableCommit commit = table.newCommit(commitUser);
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE,
+                        Arrays.asList("dt", "hh"),
+                        Arrays.asList("dt", "hh", "k"),
+                        options);
+        snapshotManager = table.snapshotManager();
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
 
         // base records
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
-        commit.commit(0, write.prepareCommit(true, 0));
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
 
         Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-        Assert.assertEquals(1, snapshot.id());
-        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
+        Assertions.assertEquals(1, snapshot.id());
+        Assertions.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
 
         // no full compaction has happened, so plan should be empty
         SnapshotEnumerator snapshotEnumerator =
                 ContinuousDataFileSnapshotEnumerator.create(table, table.newScan(), null);
         DataTableScan.DataFilePlan plan = snapshotEnumerator.enumerate();
-        Assert.assertEquals(1, (long) plan.snapshotId);
-        Assert.assertTrue(plan.splits().isEmpty());
+        Assertions.assertEquals(1, (long) plan.snapshotId);
+        Assertions.assertTrue(plan.splits().isEmpty());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -183,20 +164,18 @@ public class CompactActionITCase extends AbstractTestBase {
         // first full compaction
         List<String> actual = new ArrayList<>();
         while (plan != null) {
-            actual.addAll(getResult(table.newRead(), plan.splits()));
+            actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
             plan = snapshotEnumerator.enumerate();
         }
         actual.sort(String::compareTo);
-        Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
+        Assertions.assertEquals(
+                Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15, 20221209]"), actual);
 
         // incremental records
-        write.write(rowData(1, 101, 15, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 101, 16, BinaryString.fromString("20221208")));
-        write.write(rowData(1, 101, 15, BinaryString.fromString("20221209")));
-        commit.commit(1, write.prepareCommit(true, 1));
-
-        write.close();
-        commit.close();
+        writeData(
+                rowData(1, 101, 15, BinaryString.fromString("20221208")),
+                rowData(1, 101, 16, BinaryString.fromString("20221208")),
+                rowData(1, 101, 15, BinaryString.fromString("20221209")));
 
         while (true) {
             plan = snapshotEnumerator.enumerate();
@@ -209,20 +188,20 @@ public class CompactActionITCase extends AbstractTestBase {
         // second full compaction
         actual = new ArrayList<>();
         while (plan != null) {
-            actual.addAll(getResult(table.newRead(), plan.splits()));
+            actual.addAll(getResult(table.newRead(), plan.splits(), ROW_TYPE));
             plan = snapshotEnumerator.enumerate();
         }
         actual.sort(String::compareTo);
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 Arrays.asList(
-                        "+U 1|101|15|20221208",
-                        "+U 1|101|15|20221209",
-                        "-U 1|100|15|20221208",
-                        "-U 1|100|15|20221209"),
+                        "+U[1, 101, 15, 20221208]",
+                        "+U[1, 101, 15, 20221209]",
+                        "-U[1, 100, 15, 20221208]",
+                        "-U[1, 100, 15, 20221209]"),
                 actual);
 
         // assert dedicated compact job will expire snapshots
-        Assert.assertEquals(
+        Assertions.assertEquals(
                 snapshotManager.latestSnapshotId() - 2,
                 (long) snapshotManager.earliestSnapshotId());
     }
@@ -238,47 +217,4 @@ public class CompactActionITCase extends AbstractTestBase {
 
         return Arrays.asList(partition1, partition2);
     }
-
-    private GenericRow rowData(Object... values) {
-        return GenericRow.of(values);
-    }
-
-    private List<String> getResult(TableRead read, List<Split> splits) throws Exception {
-        List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
-        for (Split split : splits) {
-            readers.add(() -> read.createReader(split));
-        }
-        RecordReader<InternalRow> recordReader = ConcatRecordReader.create(readers);
-        RecordReaderIterator<InternalRow> iterator = new RecordReaderIterator<>(recordReader);
-        List<String> result = new ArrayList<>();
-        while (iterator.hasNext()) {
-            InternalRow rowData = iterator.next();
-            result.add(rowDataToString(rowData));
-        }
-        iterator.close();
-        return result;
-    }
-
-    private String rowDataToString(InternalRow rowData) {
-        return String.format(
-                "%s %d|%d|%d|%s",
-                rowData.getRowKind().shortString(),
-                rowData.getInt(0),
-                rowData.getInt(1),
-                rowData.getInt(2),
-                rowData.getString(3).toString());
-    }
-
-    private FileStoreTable createFileStoreTable(Map<String, String> options) throws Exception {
-        SchemaManager schemaManager = new SchemaManager(tablePath);
-        TableSchema tableSchema =
-                schemaManager.commitNewVersion(
-                        new UpdateSchema(
-                                ROW_TYPE,
-                                Arrays.asList("dt", "hh"),
-                                Arrays.asList("dt", "hh", "k"),
-                                options,
-                                ""));
-        return FileStoreTableFactory.create(tablePath, tableSchema);
-    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
new file mode 100644
index 00000000..60491b7b
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.table.store.data.BinaryString;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.DataTableScan;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link DropPartitionAction}. */
+public class DropPartitionITCase extends ActionITCaseBase {
+
+    private static final DataType[] FIELD_TYPES =
+            new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()};
+
+    private static final RowType ROW_TYPE =
+            RowType.of(FIELD_TYPES, new String[] {"partKey0", "partKey1", "dt", "value"});
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws Exception {
+        FileStoreTable table = prepareTable(hasPk);
+
+        new DropPartitionAction(
+                        tablePath,
+                        Collections.singletonList(Collections.singletonMap("partKey0", "0")))
+                .run();
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(5);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        assertThat(plan.splits().size()).isEqualTo(2);
+        List<String> actual = getResult(table.newRead(), plan.splits(), ROW_TYPE);
+
+        List<String> expected;
+        if (hasPk) {
+            expected =
+                    Arrays.asList(
+                            "+I[1, 0, 2023-01-17, 5]",
+                            "+I[1, 1, 2023-01-18, 82]",
+                            "+I[1, 1, 2023-01-19, 90]",
+                            "+I[1, 1, 2023-01-20, 97]");
+        } else {
+            expected =
+                    Arrays.asList(
+                            "+I[1, 0, 2023-01-17, 2]",
+                            "+I[1, 0, 2023-01-17, 3]",
+                            "+I[1, 0, 2023-01-17, 5]",
+                            "+I[1, 1, 2023-01-18, 82]",
+                            "+I[1, 1, 2023-01-19, 90]",
+                            "+I[1, 1, 2023-01-20, 97]");
+        }
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exception {
+        FileStoreTable table = prepareTable(hasPk);
+
+        Map<String, String> partitions0 = new HashMap<>();
+        partitions0.put("partKey0", "0");
+        partitions0.put("partKey1", "1");
+
+        Map<String, String> partitions1 = new HashMap<>();
+        partitions1.put("partKey0", "1");
+        partitions1.put("partKey1", "0");
+
+        new DropPartitionAction(tablePath, Arrays.asList(partitions0, partitions1)).run();
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(5);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        DataTableScan.DataFilePlan plan = table.newScan().plan();
+        assertThat(plan.splits().size()).isEqualTo(2);
+        List<String> actual = getResult(table.newRead(), plan.splits(), ROW_TYPE);
+
+        List<String> expected;
+        if (hasPk) {
+            expected =
+                    Arrays.asList(
+                            "+I[0, 0, 2023-01-12, 102]",
+                            "+I[0, 0, 2023-01-13, 103]",
+                            "+I[1, 1, 2023-01-18, 82]",
+                            "+I[1, 1, 2023-01-19, 90]",
+                            "+I[1, 1, 2023-01-20, 97]");
+        } else {
+            expected =
+                    Arrays.asList(
+                            "+I[0, 0, 2023-01-12, 101]",
+                            "+I[0, 0, 2023-01-12, 102]",
+                            "+I[0, 0, 2023-01-13, 103]",
+                            "+I[1, 1, 2023-01-18, 82]",
+                            "+I[1, 1, 2023-01-19, 90]",
+                            "+I[1, 1, 2023-01-20, 97]");
+        }
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private FileStoreTable prepareTable(boolean hasPk) throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE,
+                        Arrays.asList("partKey0", "partKey1"),
+                        hasPk
+                                ? Arrays.asList("partKey0", "partKey1", "dt")
+                                : Collections.emptyList(),
+                        new HashMap<>());
+        snapshotManager = table.snapshotManager();
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+
+        // prepare data
+        writeData(
+                rowData(0, 0, BinaryString.fromString("2023-01-12"), 101),
+                rowData(0, 0, BinaryString.fromString("2023-01-12"), 102),
+                rowData(0, 0, BinaryString.fromString("2023-01-13"), 103));
+
+        writeData(
+                rowData(0, 1, BinaryString.fromString("2023-01-14"), 110),
+                rowData(0, 1, BinaryString.fromString("2023-01-15"), 120),
+                rowData(0, 1, BinaryString.fromString("2023-01-16"), 130));
+
+        writeData(
+                rowData(1, 0, BinaryString.fromString("2023-01-17"), 2),
+                rowData(1, 0, BinaryString.fromString("2023-01-17"), 3),
+                rowData(1, 0, BinaryString.fromString("2023-01-17"), 5));
+
+        writeData(
+                rowData(1, 1, BinaryString.fromString("2023-01-18"), 82),
+                rowData(1, 1, BinaryString.fromString("2023-01-19"), 90),
+                rowData(1, 1, BinaryString.fromString("2023-01-20"), 97));
+
+        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+
+        assertThat(snapshot.id()).isEqualTo(4);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+        return table;
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
index 8cded1e1..e127f913 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/util/AbstractTestBase.java
@@ -36,8 +36,7 @@ import java.util.UUID;
 /** Similar to Flink's AbstractTestBase but using Junit5. */
 public class AbstractTestBase extends TestLogger {
 
-    private static final Logger LOG =
-            LoggerFactory.getLogger(org.apache.flink.test.util.AbstractTestBase.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestBase.class);
 
     private static final int DEFAULT_PARALLELISM = 4;
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
index c2c0d8c5..aa5009a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommit.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -37,16 +38,24 @@ public interface FileStoreCommit {
     /** Commit from manifest committable. */
     void commit(ManifestCommittable committable, Map<String, String> properties);
 
+    /** Overwrite a single partition from manifest committable. */
+    default void overwrite(
+            Map<String, String> partition,
+            ManifestCommittable committable,
+            Map<String, String> properties) {
+        overwrite(Collections.singletonList(partition), committable, properties);
+    }
+
     /**
-     * Overwrite from manifest committable and partition.
+     * Overwrite multiple partitions from manifest committable.
      *
-     * @param partition A single partition maps each partition key to a partition value. Depending
-     *     on the user-defined statement, the partition might not include all partition keys. Also
-     *     note that this partition does not necessarily equal to the partitions of the newly added
-     *     key-values. This is just the partition to be cleaned up.
+     * @param partitions A list of partition {@link Map}s that maps each partition key to a
+     *     partition value. Depending on the user-defined statement, the partition might not include
+     *     all partition keys. Also note that this partition does not necessarily equal to the
+     *     partitions of the newly added key-values. This is just the partition to be cleaned up.
      */
     void overwrite(
-            Map<String, String> partition,
+            List<Map<String, String>> partitions,
             ManifestCommittable committable,
             Map<String, String> properties);
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 4c0d1b8f..0d08bb13 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -240,13 +240,15 @@ public class FileStoreCommitImpl implements FileStoreCommit {
 
     @Override
     public void overwrite(
-            Map<String, String> partition,
+            List<Map<String, String>> partitions,
             ManifestCommittable committable,
             Map<String, String> properties) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Ready to overwrite partition "
-                            + partition.toString()
+                            + partitions.stream()
+                                    .map(Object::toString)
+                                    .collect(Collectors.joining(","))
                             + "\n"
                             + committable.toString());
         }
@@ -279,19 +281,35 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         // sanity check, all changes must be done within the given partition
-        Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
-        if (partitionFilter != null) {
-            for (ManifestEntry entry : appendTableFiles) {
-                if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
-                    throw new IllegalArgumentException(
-                            "Trying to overwrite partition "
-                                    + partition
-                                    + ", but the changes in "
-                                    + pathFactory.getPartitionString(entry.partition())
-                                    + " does not belong to this partition");
+        List<Predicate> partitionFilters = new ArrayList<>();
+        for (Map<String, String> partition : partitions) {
+            Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
+            if (partitionFilter != null) {
+                for (ManifestEntry entry : appendTableFiles) {
+                    if (!partitionFilter.test(
+                            partitionObjectConverter.convert(entry.partition()))) {
+                        throw new IllegalArgumentException(
+                                "Trying to overwrite partition "
+                                        + partition
+                                        + ", but the changes in "
+                                        + pathFactory.getPartitionString(entry.partition())
+                                        + " does not belong to this partition");
+                    }
                 }
+
+                partitionFilters.add(partitionFilter);
             }
         }
+
+        Predicate partitionFilter;
+        if (partitionFilters.size() == 0) {
+            partitionFilter = null;
+        } else if (partitionFilters.size() == 1) {
+            partitionFilter = partitionFilters.get(0);
+        } else {
+            partitionFilter = PredicateBuilder.or(partitionFilters);
+        }
+
         // overwrite new files
         tryOverwrite(
                 partitionFilter,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index dd6056d5..56e5905a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -39,7 +39,7 @@ public class TableCommit implements AutoCloseable {
     private final FileStoreCommit commit;
     @Nullable private final FileStoreExpire expire;
 
-    @Nullable private Map<String, String> overwritePartition = null;
+    @Nullable private List<Map<String, String>> overwritePartitions = null;
     @Nullable private Lock lock;
 
     public TableCommit(FileStoreCommit commit, @Nullable FileStoreExpire expire) {
@@ -48,7 +48,15 @@ public class TableCommit implements AutoCloseable {
     }
 
     public TableCommit withOverwritePartition(@Nullable Map<String, String> overwritePartition) {
-        this.overwritePartition = overwritePartition;
+        if (overwritePartition != null) {
+            this.overwritePartitions = Collections.singletonList(overwritePartition);
+        }
+        return this;
+    }
+
+    public TableCommit withOverwritePartitions(
+            @Nullable List<Map<String, String>> overwritePartitions) {
+        this.overwritePartitions = overwritePartitions;
         return this;
     }
 
@@ -81,7 +89,7 @@ public class TableCommit implements AutoCloseable {
     }
 
     public void commit(List<ManifestCommittable> committables) {
-        if (overwritePartition == null) {
+        if (overwritePartitions == null) {
             for (ManifestCommittable committable : committables) {
                 commit.commit(committable, new HashMap<>());
             }
@@ -99,7 +107,7 @@ public class TableCommit implements AutoCloseable {
                 // TODO maybe it can be produced by CommitterOperator
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
-            commit.overwrite(overwritePartition, committable, new HashMap<>());
+            commit.overwrite(overwritePartitions, committable, new HashMap<>());
         }
 
         if (expire != null) {
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 956d2051..308474df 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.tests;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,24 @@ public class FlinkActionsE2eTest extends E2eTestBase {
         super(true, false);
     }
 
+    private String warehousePath;
+    private String catalogDdl;
+    private String useCatalogCmd;
+
+    @BeforeEach
+    public void setUp() {
+        warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
+        catalogDdl =
+                String.format(
+                        "CREATE CATALOG ts_catalog WITH (\n"
+                                + "    'type' = 'table-store',\n"
+                                + "    'warehouse' = '%s'\n"
+                                + ");",
+                        warehousePath);
+
+        useCatalogCmd = "USE CATALOG ts_catalog;";
+    }
+
     @Test
     public void testCompact() throws Exception {
         String topicName = "ts-topic-" + UUID.randomUUID();
@@ -57,17 +76,6 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                                 + ");",
                         topicName);
 
-        String warehousePath = TEST_DATA_DIR + "/" + UUID.randomUUID() + ".store";
-        String catalogDdl =
-                String.format(
-                        "CREATE CATALOG ts_catalog WITH (\n"
-                                + "    'type' = 'table-store',\n"
-                                + "    'warehouse' = '%s'\n"
-                                + ");",
-                        warehousePath);
-
-        String useCatalogCmd = "USE CATALOG ts_catalog;";
-
         String tableDdl =
                 "CREATE TABLE IF NOT EXISTS ts_table (\n"
                         + "    dt STRING,\n"
@@ -132,6 +140,65 @@ public class FlinkActionsE2eTest extends E2eTestBase {
         checkResult("20221205, 1, 101", "20221206, 1, 101");
     }
 
+    @Test
+    public void testDropPartition() throws Exception {
+        String tableDdl =
+                "CREATE TABLE IF NOT EXISTS ts_table (\n"
+                        + "    dt STRING,\n"
+                        + "    k0 INT,\n"
+                        + "    k1 INT,\n"
+                        + "    v INT,\n"
+                        + "    PRIMARY KEY (dt, k0, k1) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (k0, k1);";
+
+        String insert =
+                "INSERT INTO ts_table VALUES ('2023-01-13', 0, 0, 15), ('2023-01-14', 0, 0, 19), ('2023-01-13', 0, 0, 39), "
+                        + "('2023-01-15', 0, 1, 34), ('2023-01-15', 0, 1, 56), ('2023-01-15', 0, 1, 37), "
+                        + "('2023-01-16', 1, 0, 25), ('2023-01-17', 1, 0, 50), ('2023-01-18', 1, 0, 75), "
+                        + "('2023-01-19', 1, 1, 23), ('2023-01-20', 1, 1, 28), ('2023-01-21', 1, 1, 31);";
+
+        runSql("SET 'table.dml-sync' = 'true';\n" + insert, catalogDdl, useCatalogCmd, tableDdl);
+
+        // run drop partition job
+        Container.ExecResult execResult =
+                jobManager.execInContainer(
+                        "bin/flink",
+                        "run",
+                        "-c",
+                        "org.apache.flink.table.store.connector.action.FlinkActions",
+                        "--detached",
+                        "lib/flink-table-store.jar",
+                        "drop-partition",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "ts_table",
+                        "--partition",
+                        "k0=0,k1=1",
+                        "--partition",
+                        "k0=1,k1=0");
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        // read all data from table store
+        runSql(
+                "INSERT INTO result1 SELECT * FROM ts_table;",
+                catalogDdl,
+                useCatalogCmd,
+                tableDdl,
+                createResultSink("result1", "dt STRING, k0 INT, k1 INT, v INT"));
+
+        // check the left data
+        checkResult(
+                "2023-01-13, 0, 0, 39",
+                "2023-01-14, 0, 0, 19",
+                "2023-01-19, 1, 1, 23",
+                "2023-01-20, 1, 1, 28",
+                "2023-01-21, 1, 1, 31");
+    }
+
     private void runSql(String sql, String... ddls) throws Exception {
         runSql(String.join("\n", ddls) + "\n" + sql);
     }