You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:42:49 UTC

[incubator-paimon] branch master updated: [core] Introduce dynamic partition overwrite as default behavior (#769)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dade2103c [core] Introduce dynamic partition overwrite as default behavior (#769)
dade2103c is described below

commit dade2103cd2a2ee1cbd23fd74db77f0299d8cdd1
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Fri Mar 31 14:42:44 2023 +0800

    [core] Introduce dynamic partition overwrite as default behavior (#769)
---
 docs/content/how-to/writing-tables.md              |   7 +-
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../generated/flink_connector_configuration.html   |  12 +-
 .../apache/paimon/predicate/PredicateBuilder.java  |  22 +++
 .../org/apache/paimon/data/DataFormatTestUtil.java |   2 +-
 .../paimon/predicate/PredicateBuilderTest.java     |   0
 .../org/apache/paimon/predicate/PredicateTest.java |   0
 .../java/org/apache/paimon/AbstractFileStore.java  |   1 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 ++
 .../apache/paimon/operation/FileStoreCommit.java   |   2 -
 .../paimon/operation/FileStoreCommitImpl.java      |  70 ++++++---
 .../apache/paimon/table/sink/TableCommitImpl.java  |   4 +-
 .../test/java/org/apache/paimon/TestFileStore.java |   3 +
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  17 +++
 .../ChangelogValueCountFileStoreTableTest.java     |  17 +++
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  27 +++-
 .../paimon/table/FileStoreTableTestBase.java       | 160 +++++++++++++++++++++
 .../paimon/table/WritePreemptMemoryTest.java       |  22 +++
 .../CompositePkAndMultiPartitionedTableITCase.java |  11 +-
 ...AndMultiPartitionedTableWIthKafkaLogITCase.java |   4 +-
 .../org/apache/paimon/flink/FileStoreITCase.java   |  27 +++-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 114 +++++++++++++--
 .../paimon/flink/action/ActionITCaseBase.java      |   2 +-
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |   1 +
 24 files changed, 480 insertions(+), 63 deletions(-)

diff --git a/docs/content/how-to/writing-tables.md b/docs/content/how-to/writing-tables.md
index 42ad22483..006635938 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -80,8 +80,10 @@ For more information, please check the syntax document:
 [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)
 
 {{< hint info >}}
-Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the
+1. Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the
 commits of `OVERWRITE`, you can configure `streaming-read-overwrite`.
+2. For partitioned table, Paimon's default overwrite mode is dynamic partition overwrite (that means Paimon only 
+deletes the partitions appear in the overwrite data). You can configure `dynamic-partition-overwrite` to change it.
 {{< /hint >}}
 
 ## Applying Records/Changes to Tables
@@ -151,7 +153,8 @@ INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ..
 
 ## Purging tables
 
-You can use `INSERT OVERWRITE` to purge tables by inserting empty value.
+You can use `INSERT OVERWRITE` to purge tables by inserting empty value. For partitioned 
+table, make sure that the table's `overwrite.dynamic-partition = false`.
 
 {{< tabs "purge-tables-syntax" >}}
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ab7b648a6..5c023d244 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The discovery interval of continuous reading.</td>
         </tr>
+        <tr>
+            <td><h5>dynamic-partition-overwrite</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
+        </tr>
         <tr>
             <td><h5>file.compression.per.level</h5></td>
             <td style="word-wrap: break-word;"></td>
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0b4e43135..8060955d2 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,18 +26,18 @@
             <td>String</td>
             <td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka.</li></ul></td>
         </tr>
-        <tr>
-            <td><h5>scan.split-enumerator.batch-size</h5></td>
-            <td style="word-wrap: break-word;">10</td>
-            <td>Integer</td>
-            <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
-        </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
             <td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
         </tr>
+        <tr>
+            <td><h5>scan.split-enumerator.batch-size</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
+        </tr>
         <tr>
             <td><h5>scan.watermark.alignment.group</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 03391496d..93d0b0356 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.predicate;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.Timestamp;
@@ -27,6 +28,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.TypeUtils;
 
 import javax.annotation.Nullable;
@@ -344,4 +346,24 @@ public class PredicateBuilder {
         }
         return predicate;
     }
+
+    public static Predicate equalPartition(BinaryRow partition, RowType partitionType) {
+        Preconditions.checkArgument(
+                partition.getFieldCount() == partitionType.getFieldCount(),
+                "Partition's field count should be equal to partitionType's field count.");
+
+        RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType);
+        Predicate predicate = null;
+        PredicateBuilder builder = new PredicateBuilder(partitionType);
+        Object[] literals = converter.convert(partition);
+        for (int i = 0; i < literals.length; i++) {
+            if (predicate == null) {
+                predicate = builder.equal(i, literals[i]);
+            } else {
+                predicate = PredicateBuilder.and(predicate, builder.equal(i, literals[i]));
+            }
+        }
+
+        return predicate;
+    }
 }
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 03fd1a319..42c4af6dc 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -45,7 +45,7 @@ public class DataFormatTestUtil {
     }
 
     /** Stringify the given {@link InternalRow}. */
-    public static String rowDataToString(InternalRow row, RowType type) {
+    public static String internalRowToString(InternalRow row, RowType type) {
         return row.getRowKind().shortString() + "[" + toStringNoRowKind(row, type) + ']';
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
similarity index 100%
rename from paimon-core/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
rename to paimon-common/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
diff --git a/paimon-core/src/test/java/org/apache/paimon/predicate/PredicateTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateTest.java
similarity index 100%
rename from paimon-core/src/test/java/org/apache/paimon/predicate/PredicateTest.java
rename to paimon-common/src/test/java/org/apache/paimon/predicate/PredicateTest.java
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index f644129a9..20fde8b42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -115,6 +115,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
                 options.bucket(),
                 options.manifestTargetSize(),
                 options.manifestMergeMinCount(),
+                partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
                 newKeyComparator());
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index b88c184c5..9f804cefa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -413,6 +413,14 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to read the changes from overwrite in streaming mode.");
 
+    public static final ConfigOption<Boolean> DYNAMIC_PARTITION_OVERWRITE =
+            key("dynamic-partition-overwrite")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether only overwrite dynamic partition when overwriting a partitioned table with "
+                                    + "dynamic partition columns. Works only when the table has partition keys.");
+
     public static final ConfigOption<Duration> PARTITION_EXPIRATION_TIME =
             key("partition.expiration-time")
                     .durationType()
@@ -764,6 +772,10 @@ public class CoreOptions implements Serializable {
         return options.get(STREAMING_READ_OVERWRITE);
     }
 
+    public boolean dynamicPartitionOverwrite() {
+        return options.get(DYNAMIC_PARTITION_OVERWRITE);
+    }
+
     public Duration partitionExpireTime() {
         return options.get(PARTITION_EXPIRATION_TIME);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 2c114b830..a60488f4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -55,8 +55,6 @@ public interface FileStoreCommit {
     /**
      * Overwrite from manifest committable and partition.
      *
-     * <p>TODO: The method's semantics can be dynamic or static overwrite according to properties.
-     *
      * @param partition A single partition maps each partition key to a partition value. Depending
      *     on the user-defined statement, the partition might not include all partition keys. Also
      *     note that this partition does not necessarily equal to the partitions of the newly added
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a7f7d7e38..bc9b57d4c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -100,6 +100,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private final int numBucket;
     private final MemorySize manifestTargetSize;
     private final int manifestMergeMinCount;
+    private final boolean dynamicPartitionOverwrite;
     @Nullable private final Comparator<InternalRow> keyComparator;
 
     @Nullable private Lock lock;
@@ -118,6 +119,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             int numBucket,
             MemorySize manifestTargetSize,
             int manifestMergeMinCount,
+            boolean dynamicPartitionOverwrite,
             @Nullable Comparator<InternalRow> keyComparator) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -132,6 +134,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         this.numBucket = numBucket;
         this.manifestTargetSize = manifestTargetSize;
         this.manifestMergeMinCount = manifestMergeMinCount;
+        this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
         this.keyComparator = keyComparator;
 
         this.lock = null;
@@ -253,10 +256,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             Map<String, String> properties) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
-                    "Ready to overwrite partition "
-                            + partition.toString()
-                            + "\n"
-                            + committable.toString());
+                    "Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}",
+                    partition,
+                    committable,
+                    properties);
         }
 
         List<ManifestEntry> appendTableFiles = new ArrayList<>();
@@ -286,28 +289,53 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             LOG.warn(warnMessage.toString());
         }
 
-        // sanity check, all changes must be done within the given partition
-        Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
-        if (partitionFilter != null) {
-            for (ManifestEntry entry : appendTableFiles) {
-                if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
-                    throw new IllegalArgumentException(
-                            "Trying to overwrite partition "
-                                    + partition
-                                    + ", but the changes in "
-                                    + pathFactory.getPartitionString(entry.partition())
-                                    + " does not belong to this partition");
+        boolean skipOverwrite = false;
+        // partition filter is built from static or dynamic partition according to properties
+        Predicate partitionFilter = null;
+        if (dynamicPartitionOverwrite) {
+            if (appendTableFiles.isEmpty()) {
+                // in dynamic mode, if there is no changes to commit, no data will be deleted
+                skipOverwrite = true;
+            } else {
+                partitionFilter =
+                        appendTableFiles.stream()
+                                .map(ManifestEntry::partition)
+                                .distinct()
+                                // partition filter is built from new data's partitions
+                                .map(p -> PredicateBuilder.equalPartition(p, partitionType))
+                                .reduce(PredicateBuilder::or)
+                                .orElseThrow(
+                                        () ->
+                                                new RuntimeException(
+                                                        "Failed to get dynamic partition filter. This is unexpected."));
+            }
+        } else {
+            partitionFilter = PredicateBuilder.partition(partition, partitionType);
+            // sanity check, all changes must be done within the given partition
+            if (partitionFilter != null) {
+                for (ManifestEntry entry : appendTableFiles) {
+                    if (!partitionFilter.test(
+                            partitionObjectConverter.convert(entry.partition()))) {
+                        throw new IllegalArgumentException(
+                                "Trying to overwrite partition "
+                                        + partition
+                                        + ", but the changes in "
+                                        + pathFactory.getPartitionString(entry.partition())
+                                        + " does not belong to this partition");
+                    }
                 }
             }
         }
 
         // overwrite new files
-        tryOverwrite(
-                partitionFilter,
-                appendTableFiles,
-                committable.identifier(),
-                committable.watermark(),
-                committable.logOffsets());
+        if (!skipOverwrite) {
+            tryOverwrite(
+                    partitionFilter,
+                    appendTableFiles,
+                    committable.identifier(),
+                    committable.watermark(),
+                    committable.logOffsets());
+        }
 
         if (!compactTableFiles.isEmpty()) {
             tryCommit(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 0236e27ad..91af65175 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -100,7 +100,7 @@ public class TableCommitImpl implements InnerTableCommit {
         if (overwritePartition == null) {
             commit.commit(committable, new HashMap<>());
         } else {
-            commit.overwrite(overwritePartition, committable, new HashMap<>());
+            commit.overwrite(overwritePartition, committable, Collections.emptyMap());
         }
         expire();
     }
@@ -128,7 +128,7 @@ public class TableCommitImpl implements InnerTableCommit {
                 // TODO maybe it can be produced by CommitterOperator
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
-            commit.overwrite(overwritePartition, committable, new HashMap<>());
+            commit.overwrite(overwritePartition, committable, Collections.emptyMap());
         }
 
         expire();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b6750d5d0..9c860928f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -556,6 +556,9 @@ public class TestFileStore extends KeyValueFileStore {
 
             conf.set(CoreOptions.CHANGELOG_PRODUCER, changelogProducer);
 
+            // disable dynamic-partition-overwrite in FileStoreCommit layer test
+            conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
+
             return new TestFileStore(
                     root,
                     new CoreOptions(conf),
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index a2ed61231..07e74a573 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -303,4 +303,21 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
                                 ""));
         return new AppendOnlyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
     }
+
+    @Override
+    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                OVERWRITE_TEST_ROW_TYPE.getFields(),
+                                Arrays.asList("pt0", "pt1"),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        return new AppendOnlyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
index 38d1f1b2c..6953b36f3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
@@ -225,4 +225,21 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
                                 ""));
         return new ChangelogValueCountFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
     }
+
+    @Override
+    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                OVERWRITE_TEST_ROW_TYPE.getFields(),
+                                Arrays.asList("pt0", "pt1"),
+                                Arrays.asList("pk", "pt0", "pt1"),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogValueCountFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 61ece3833..4dc4298d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -66,7 +66,7 @@ import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import static org.apache.paimon.data.DataFormatTestUtil.rowDataToString;
+import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -692,7 +692,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         AuditLogTable auditLogTable = new AuditLogTable(table);
         Function<InternalRow, String> rowDataToString =
                 row ->
-                        rowDataToString(
+                        internalRowToString(
                                 row,
                                 DataTypes.ROW(
                                         DataTypes.STRING(),
@@ -731,7 +731,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         read = auditLogTable.newRead().withProjection(new int[] {2, 0, 1});
         Function<InternalRow, String> projectToString1 =
                 row ->
-                        rowDataToString(
+                        internalRowToString(
                                 row,
                                 DataTypes.ROW(
                                         DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()));
@@ -743,7 +743,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         snapshotSplitReader = auditLogTable.newSnapshotSplitReader();
         read = auditLogTable.newRead().withProjection(new int[] {2, 1});
         Function<InternalRow, String> projectToString2 =
-                row -> rowDataToString(row, DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
+                row -> internalRowToString(row, DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
         result = getResult(read, toSplits(snapshotSplitReader.splits()), projectToString2);
         assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
     }
@@ -765,7 +765,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                             options.set("fields.c.ignore-retract", "true");
                         },
                         rowType);
-        Function<InternalRow, String> rowToString = row -> rowDataToString(row, rowType);
+        Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
         SnapshotSplitReader snapshotSplitReader = table.newSnapshotSplitReader();
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite("");
@@ -821,6 +821,23 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         return createFileStoreTable(configure, ROW_TYPE);
     }
 
+    @Override
+    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                OVERWRITE_TEST_ROW_TYPE.getFields(),
+                                Arrays.asList("pt0", "pt1"),
+                                Arrays.asList("pk", "pt0", "pt1"),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+    }
+
     private FileStoreTable createFileStoreTable(Consumer<Options> configure, RowType rowType)
             throws Exception {
         Options conf = new Options();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 39da2b48d..b5ea597bb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -57,6 +58,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -79,6 +83,7 @@ import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 /** Base test class for {@link FileStoreTable}. */
 public abstract class FileStoreTableTestBase {
@@ -95,6 +100,15 @@ public abstract class FileStoreTableTestBase {
                         DataTypes.MULTISET(DataTypes.VARCHAR(8))
                     },
                     new String[] {"pt", "a", "b", "c", "d", "e", "f"});
+
+    // for overwrite test
+    protected static final RowType OVERWRITE_TEST_ROW_TYPE =
+            RowType.of(
+                    new DataType[] {
+                        DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()
+                    },
+                    new String[] {"pk", "pt0", "pt1", "v"});
+
     protected static final int[] PROJECTION = new int[] {2, 1};
     protected static final Function<InternalRow, String> BATCH_ROW_TO_STRING =
             rowData ->
@@ -192,6 +206,62 @@ public abstract class FileStoreTableTestBase {
                         "2|22|222|binary|varbinary|mapKey:mapVal|multiset");
     }
 
+    @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+    @MethodSource("overwriteTestData")
+    public void testOverwriteNothing(
+            boolean dynamicPartitionOverwrite,
+            List<InternalRow> overwriteData,
+            Map<String, String> overwritePartition,
+            List<String> expected)
+            throws Exception {
+        FileStoreTable table = overwriteTestFileStoreTable();
+        if (!dynamicPartitionOverwrite) {
+            table =
+                    table.copy(
+                            Collections.singletonMap(
+                                    CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
+        }
+
+        // prepare data
+        // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
+        // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
+        // (7, 2, 'A', 'Test')
+        // (8, 2, 'B', 'Case')
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                InnerTableCommit commit = table.newCommit(commitUser)) {
+            write.write(overwriteRow(1, 1, "A", "Hi"));
+            write.write(overwriteRow(2, 1, "A", "Hello"));
+            write.write(overwriteRow(3, 1, "A", "World"));
+            write.write(overwriteRow(4, 1, "B", "To"));
+            write.write(overwriteRow(5, 1, "B", "Apache"));
+            write.write(overwriteRow(6, 1, "B", "Paimon"));
+            write.write(overwriteRow(7, 2, "A", "Test"));
+            write.write(overwriteRow(8, 2, "B", "Case"));
+            commit.commit(0, write.prepareCommit(true, 0));
+        }
+
+        // overwrite data
+        try (StreamTableWrite write = table.newWrite(commitUser).withOverwrite(true);
+                InnerTableCommit commit = table.newCommit(commitUser)) {
+            for (InternalRow row : overwriteData) {
+                write.write(row);
+            }
+            commit.withOverwrite(overwritePartition).commit(1, write.prepareCommit(true, 1));
+        }
+
+        // validate
+        List<Split> splits = toSplits(table.newSnapshotSplitReader().splits());
+        TableRead read = table.newRead();
+        assertThat(
+                        getResult(
+                                read,
+                                splits,
+                                row ->
+                                        DataFormatTestUtil.toStringNoRowKind(
+                                                row, OVERWRITE_TEST_ROW_TYPE)))
+                .hasSameElementsAs(expected);
+    }
+
     @Test
     public void testOverwrite() throws Exception {
         FileStoreTable table = createFileStoreTable();
@@ -497,6 +567,96 @@ public abstract class FileStoreTableTestBase {
     protected abstract FileStoreTable createFileStoreTable(Consumer<Options> configure)
             throws Exception;
 
+    protected abstract FileStoreTable overwriteTestFileStoreTable() throws Exception;
+
+    private static InternalRow overwriteRow(Object... values) {
+        return GenericRow.of(
+                values[0],
+                values[1],
+                BinaryString.fromString((String) values[2]),
+                BinaryString.fromString((String) values[3]));
+    }
+
+    private static List<Arguments> overwriteTestData() {
+        // dynamic, overwrite data, overwrite partition, expected
+        return Arrays.asList(
+                // nothing happen
+                arguments(
+                        true,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "1, 1, A, Hi",
+                                "2, 1, A, Hello",
+                                "3, 1, A, World",
+                                "4, 1, B, To",
+                                "5, 1, B, Apache",
+                                "6, 1, B, Paimon",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                // delete all data
+                arguments(
+                        false,
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        Collections.emptyList()),
+                // specify one partition key
+                arguments(
+                        true,
+                        Arrays.asList(
+                                overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")),
+                        Collections.singletonMap("pt0", "1"),
+                        Arrays.asList(
+                                "1, 1, A, Where",
+                                "2, 1, A, When",
+                                "4, 1, B, To",
+                                "5, 1, B, Apache",
+                                "6, 1, B, Paimon",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                arguments(
+                        false,
+                        Arrays.asList(
+                                overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")),
+                        Collections.singletonMap("pt0", "1"),
+                        Arrays.asList(
+                                "1, 1, A, Where",
+                                "2, 1, A, When",
+                                "7, 2, A, Test",
+                                "8, 2, B, Case")),
+                // all dynamic
+                arguments(
+                        true,
+                        Arrays.asList(
+                                overwriteRow(4, 1, "B", "Where"),
+                                overwriteRow(5, 1, "B", "When"),
+                                overwriteRow(10, 2, "A", "Static"),
+                                overwriteRow(11, 2, "A", "Dynamic")),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "1, 1, A, Hi",
+                                "2, 1, A, Hello",
+                                "3, 1, A, World",
+                                "4, 1, B, Where",
+                                "5, 1, B, When",
+                                "10, 2, A, Static",
+                                "11, 2, A, Dynamic",
+                                "8, 2, B, Case")),
+                arguments(
+                        false,
+                        Arrays.asList(
+                                overwriteRow(4, 1, "B", "Where"),
+                                overwriteRow(5, 1, "B", "When"),
+                                overwriteRow(10, 2, "A", "Static"),
+                                overwriteRow(11, 2, "A", "Dynamic")),
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                "4, 1, B, Where",
+                                "5, 1, B, When",
+                                "10, 2, A, Static",
+                                "11, 2, A, Dynamic")));
+    }
+
     protected List<Split> toSplits(List<DataSplit> dataSplits) {
         return new ArrayList<>(dataSplits);
     }
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
index 188dc6238..387158148 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
@@ -108,4 +108,26 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
                                 ""));
         return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, schema);
     }
+
+    @Override
+    protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        // Run with minimal memory to ensure a more intense preempt
+        // Currently a writer needs at least one page
+        int pages = 10;
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
+        TableSchema schema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                OVERWRITE_TEST_ROW_TYPE.getFields(),
+                                Arrays.asList("pt0", "pt1"),
+                                Arrays.asList("pk", "pt0", "pt1"),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, schema);
+    }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
index 39a35dd4d..2eb90ba2e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.util.AbstractTestBase;
 
 import org.apache.flink.types.Row;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
@@ -44,6 +46,9 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
 /** Paimon IT case when the table has composite primary keys and multiple partition fields. */
 public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase {
 
+    private final Map<String, String> staticPartitionOverwrite =
+            Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
+
     @BeforeEach
     public void setUp() {
         init(getTempDirPath());
@@ -277,7 +282,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase
                                 "rate_by_to_currency DOUBLE",
                                 "dt STRING"),
                         Arrays.asList("from_currency", "to_currency", "dt"),
-                        Collections.singletonList("dt"));
+                        Collections.singletonList("dt"),
+                        staticPartitionOverwrite);
 
         insertIntoFromTable(temporaryTable, table);
 
@@ -595,7 +601,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase
                 createTable(
                         Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"),
                         Collections.emptyList(),
-                        Arrays.asList("dt", "hh"));
+                        Arrays.asList("dt", "hh"),
+                        staticPartitionOverwrite);
 
         insertIntoFromTable(temporaryTable, table);
 
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
index 5e1e7a123..0c09bd2a4 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
@@ -56,8 +56,8 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
 public class CompositePkAndMultiPartitionedTableWIthKafkaLogITCase extends KafkaTableTestBase {
 
     @BeforeEach
-    public void setUp() throws Exception {
-        init(createAndRegisterTempFile("").toString());
+    public void setUp() {
+        init(getTempDirPath());
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 05d83e621..8b5fba73a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -61,7 +61,6 @@ import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -121,7 +120,7 @@ public class FileStoreITCase extends AbstractTestBase {
 
     private final StreamExecutionEnvironment env;
 
-    public FileStoreITCase(boolean isBatch) throws IOException {
+    public FileStoreITCase(boolean isBatch) {
         this.isBatch = isBatch;
         this.env = isBatch ? buildBatchEnv() : buildStreamEnv();
     }
@@ -203,7 +202,7 @@ public class FileStoreITCase extends AbstractTestBase {
         Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
 
-        // overwrite all
+        // test dynamic partition overwrite
         partialData =
                 env.fromCollection(
                         Collections.singletonList(
@@ -217,7 +216,27 @@ public class FileStoreITCase extends AbstractTestBase {
 
         // read
         results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build());
-        expected = new Row[] {Row.of(19, "p2", 6)};
+        expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), Row.of(0, "p1", 2)};
+        assertThat(results).containsExactlyInAnyOrder(expected);
+
+        // test static overwrite all
+        partialData =
+                env.fromCollection(
+                        Collections.singletonList(
+                                wrap(GenericRowData.of(20, StringData.fromString("p2"), 3))),
+                        InternalTypeInfo.of(TABLE_TYPE));
+        new FlinkSinkBuilder(
+                        table.copy(
+                                Collections.singletonMap(
+                                        CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false")))
+                .withInput(partialData)
+                .withOverwritePartition(new HashMap<>())
+                .build();
+        env.execute();
+
+        // read
+        results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build());
+        expected = new Row[] {Row.of(20, "p2", 3)};
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
 
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 57d9fb118..cfe5f9f35 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -61,6 +61,7 @@ import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bExeEnv;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
@@ -85,11 +86,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 public class ReadWriteTableITCase extends AbstractTestBase {
 
     private final Map<String, String> streamingReadOverwrite =
-            new HashMap<String, String>() {
-                {
-                    put(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
-                }
-            };
+            Collections.singletonMap(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
+
+    private final Map<String, String> staticPartitionOverwrite =
+            Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
 
     @BeforeEach
     public void setUp() {
@@ -660,7 +660,76 @@ public class ReadWriteTableITCase extends AbstractTestBase {
     }
 
     // ----------------------------------------------------------------------------------------------------------------
-    // Purge data using overwrite
+    // Dynamic partition overwrite (default option)
+    // ----------------------------------------------------------------------------------------------------------------
+
+    @Test
+    public void testDynamicOverwrite() throws Exception {
+        String table =
+                createTable(
+                        Arrays.asList("pk INT", "part0 INT", "part1 STRING", "v STRING"),
+                        Arrays.asList("pk", "part0", "part1"),
+                        Arrays.asList("part0", "part1"),
+                        streamingReadOverwrite);
+
+        insertInto(
+                table,
+                "(1, 1, 'A', 'Hi')",
+                "(2, 1, 'A', 'Hello')",
+                "(3, 1, 'A', 'World')",
+                "(4, 1, 'B', 'To')",
+                "(5, 1, 'B', 'Apache')",
+                "(6, 1, 'B', 'Paimon')",
+                "(7, 2, 'A', 'Test')",
+                "(8, 2, 'B', 'Case')");
+
+        BlockingIterator<Row, Row> streamItr =
+                testStreamingRead(
+                        buildSimpleQuery(table),
+                        Arrays.asList(
+                                changelogRow("+I", 1, 1, "A", "Hi"),
+                                changelogRow("+I", 2, 1, "A", "Hello"),
+                                changelogRow("+I", 3, 1, "A", "World"),
+                                changelogRow("+I", 4, 1, "B", "To"),
+                                changelogRow("+I", 5, 1, "B", "Apache"),
+                                changelogRow("+I", 6, 1, "B", "Paimon"),
+                                changelogRow("+I", 7, 2, "A", "Test"),
+                                changelogRow("+I", 8, 2, "B", "Case")));
+
+        bEnv.executeSql(
+                        String.format(
+                                "INSERT OVERWRITE `%s` VALUES (4, 1, 'B', 'Where'), (5, 1, 'B', 'When'), (10, 2, 'A', 'Static'), (11, 2, 'A', 'Dynamic')",
+                                table))
+                .await();
+
+        assertThat(streamItr.collect(8))
+                .containsExactlyInAnyOrder(
+                        changelogRow("-D", 4, 1, "B", "To"),
+                        changelogRow("-D", 5, 1, "B", "Apache"),
+                        changelogRow("-D", 6, 1, "B", "Paimon"),
+                        changelogRow("-D", 7, 2, "A", "Test"),
+                        changelogRow("+I", 4, 1, "B", "Where"),
+                        changelogRow("+I", 5, 1, "B", "When"),
+                        changelogRow("+I", 10, 2, "A", "Static"),
+                        changelogRow("+I", 11, 2, "A", "Dynamic"));
+        assertNoMoreRecords(streamItr);
+        streamItr.close();
+
+        testBatchRead(
+                buildSimpleQuery(table),
+                Arrays.asList(
+                        changelogRow("+I", 1, 1, "A", "Hi"),
+                        changelogRow("+I", 2, 1, "A", "Hello"),
+                        changelogRow("+I", 3, 1, "A", "World"),
+                        changelogRow("+I", 4, 1, "B", "Where"),
+                        changelogRow("+I", 5, 1, "B", "When"),
+                        changelogRow("+I", 10, 2, "A", "Static"),
+                        changelogRow("+I", 11, 2, "A", "Dynamic"),
+                        changelogRow("+I", 8, 2, "B", "Case")));
+    }
+
+    // ----------------------------------------------------------------------------------------------------------------
+    // Purge data using overwrite (NOTE: set overwrite.dynamic-partition = false)
     // ----------------------------------------------------------------------------------------------------------------
 
     @Test
@@ -669,9 +738,10 @@ public class ReadWriteTableITCase extends AbstractTestBase {
                 createTable(
                         Arrays.asList("k0 INT", "k1 STRING", "v STRING"),
                         Collections.emptyList(),
-                        Collections.emptyList());
+                        Collections.emptyList(),
+                        staticPartitionOverwrite);
 
-        validateOverwriteResult(table, "", "*", Collections.emptyList());
+        validatePurgingResult(table, "", "*", Collections.emptyList());
     }
 
     @Test
@@ -680,9 +750,13 @@ public class ReadWriteTableITCase extends AbstractTestBase {
 
         // single partition key
         String table =
-                createTable(fieldsSpec, Collections.emptyList(), Collections.singletonList("k0"));
+                createTable(
+                        fieldsSpec,
+                        Collections.emptyList(),
+                        Collections.singletonList("k0"),
+                        staticPartitionOverwrite);
 
-        validateOverwriteResult(
+        validatePurgingResult(
                 table,
                 "PARTITION (k0 = 0)",
                 "k1, v",
@@ -692,9 +766,14 @@ public class ReadWriteTableITCase extends AbstractTestBase {
                         changelogRow("+I", 1, "2023-01-02", "store")));
 
         // multiple partition keys and overwrite one partition key
-        table = createTable(fieldsSpec, Collections.emptyList(), Arrays.asList("k0", "k1"));
+        table =
+                createTable(
+                        fieldsSpec,
+                        Collections.emptyList(),
+                        Arrays.asList("k0", "k1"),
+                        staticPartitionOverwrite);
 
-        validateOverwriteResult(
+        validatePurgingResult(
                 table,
                 "PARTITION (k0 = 0)",
                 "k1, v",
@@ -704,9 +783,14 @@ public class ReadWriteTableITCase extends AbstractTestBase {
                         changelogRow("+I", 1, "2023-01-02", "store")));
 
         // multiple partition keys and overwrite all partition keys
-        table = createTable(fieldsSpec, Collections.emptyList(), Arrays.asList("k0", "k1"));
+        table =
+                createTable(
+                        fieldsSpec,
+                        Collections.emptyList(),
+                        Arrays.asList("k0", "k1"),
+                        staticPartitionOverwrite);
 
-        validateOverwriteResult(
+        validatePurgingResult(
                 table,
                 "PARTITION (k0 = 0, k1 = '2023-01-01')",
                 "v",
@@ -1085,7 +1169,7 @@ public class ReadWriteTableITCase extends AbstractTestBase {
     // Tools
     // ----------------------------------------------------------------------------------------------------------------
 
-    private void validateOverwriteResult(
+    private void validatePurgingResult(
             String table, String partitionSpec, String projectionSpec, List<Row> expected)
             throws Exception {
         insertInto(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 5d714bfa3..ee1d010a3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -112,7 +112,7 @@ public class ActionITCaseBase extends AbstractTestBase {
         RecordReader<InternalRow> recordReader = read.createReader(splits);
         List<String> result = new ArrayList<>();
         recordReader.forEachRemaining(
-                row -> result.add(DataFormatTestUtil.rowDataToString(row, rowType)));
+                row -> result.add(DataFormatTestUtil.internalRowToString(row, rowType)));
         return result;
     }
 }
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 270ee48c3..8eba48b1e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -148,6 +148,7 @@ public class ReadWriteTableTestUtil {
                                 put(LOG_SYSTEM.key(), "kafka");
                                 put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
                                 put(TOPIC.key(), topic);
+                                put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
                             }
                         });