You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/31 06:42:49 UTC
[incubator-paimon] branch master updated: [core] Introduce dynamic partition overwrite as default behavior (#769)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dade2103c [core] Introduce dynamic partition overwrite as default behavior (#769)
dade2103c is described below
commit dade2103cd2a2ee1cbd23fd74db77f0299d8cdd1
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Fri Mar 31 14:42:44 2023 +0800
[core] Introduce dynamic partition overwrite as default behavior (#769)
---
docs/content/how-to/writing-tables.md | 7 +-
.../shortcodes/generated/core_configuration.html | 6 +
.../generated/flink_connector_configuration.html | 12 +-
.../apache/paimon/predicate/PredicateBuilder.java | 22 +++
.../org/apache/paimon/data/DataFormatTestUtil.java | 2 +-
.../paimon/predicate/PredicateBuilderTest.java | 0
.../org/apache/paimon/predicate/PredicateTest.java | 0
.../java/org/apache/paimon/AbstractFileStore.java | 1 +
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++
.../apache/paimon/operation/FileStoreCommit.java | 2 -
.../paimon/operation/FileStoreCommitImpl.java | 70 ++++++---
.../apache/paimon/table/sink/TableCommitImpl.java | 4 +-
.../test/java/org/apache/paimon/TestFileStore.java | 3 +
.../paimon/table/AppendOnlyFileStoreTableTest.java | 17 +++
.../ChangelogValueCountFileStoreTableTest.java | 17 +++
.../table/ChangelogWithKeyFileStoreTableTest.java | 27 +++-
.../paimon/table/FileStoreTableTestBase.java | 160 +++++++++++++++++++++
.../paimon/table/WritePreemptMemoryTest.java | 22 +++
.../CompositePkAndMultiPartitionedTableITCase.java | 11 +-
...AndMultiPartitionedTableWIthKafkaLogITCase.java | 4 +-
.../org/apache/paimon/flink/FileStoreITCase.java | 27 +++-
.../apache/paimon/flink/ReadWriteTableITCase.java | 114 +++++++++++++--
.../paimon/flink/action/ActionITCaseBase.java | 2 +-
.../paimon/flink/util/ReadWriteTableTestUtil.java | 1 +
24 files changed, 480 insertions(+), 63 deletions(-)
diff --git a/docs/content/how-to/writing-tables.md b/docs/content/how-to/writing-tables.md
index 42ad22483..006635938 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -80,8 +80,10 @@ For more information, please check the syntax document:
[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html)
{{< hint info >}}
-Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the
+1. Streaming reading will ignore the commits generated by `INSERT OVERWRITE` by default. If you want to read the
commits of `OVERWRITE`, you can configure `streaming-read-overwrite`.
+2. For partitioned table, Paimon's default overwrite mode is dynamic partition overwrite (that means Paimon only
+deletes the partitions appear in the overwrite data). You can configure `dynamic-partition-overwrite` to change it.
{{< /hint >}}
## Applying Records/Changes to Tables
@@ -151,7 +153,8 @@ INSERT OVERWRITE MyTable PARTITION (key1 = value1, key2 = value2, ...) SELECT ..
## Purging tables
-You can use `INSERT OVERWRITE` to purge tables by inserting empty value.
+You can use `INSERT OVERWRITE` to purge tables by inserting empty value. For partitioned
+table, make sure that the table's `overwrite.dynamic-partition = false`.
{{< tabs "purge-tables-syntax" >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index ab7b648a6..5c023d244 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -74,6 +74,12 @@
<td>Duration</td>
<td>The discovery interval of continuous reading.</td>
</tr>
+ <tr>
+ <td><h5>dynamic-partition-overwrite</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.</td>
+ </tr>
<tr>
<td><h5>file.compression.per.level</h5></td>
<td style="word-wrap: break-word;"></td>
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0b4e43135..8060955d2 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,18 +26,18 @@
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka.</li></ul></td>
</tr>
- <tr>
- <td><h5>scan.split-enumerator.batch-size</h5></td>
- <td style="word-wrap: break-word;">10</td>
- <td>Integer</td>
- <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
- </tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
+ <tr>
+ <td><h5>scan.split-enumerator.batch-size</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>How many splits should assign to subtask per batch in StaticFileStoreSplitEnumerator to avoid exceed `akka.framesize` limit.</td>
+ </tr>
<tr>
<td><h5>scan.watermark.alignment.group</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 03391496d..93d0b0356 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -19,6 +19,7 @@
package org.apache.paimon.predicate;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
@@ -27,6 +28,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.TypeUtils;
import javax.annotation.Nullable;
@@ -344,4 +346,24 @@ public class PredicateBuilder {
}
return predicate;
}
+
+ public static Predicate equalPartition(BinaryRow partition, RowType partitionType) {
+ Preconditions.checkArgument(
+ partition.getFieldCount() == partitionType.getFieldCount(),
+ "Partition's field count should be equal to partitionType's field count.");
+
+ RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType);
+ Predicate predicate = null;
+ PredicateBuilder builder = new PredicateBuilder(partitionType);
+ Object[] literals = converter.convert(partition);
+ for (int i = 0; i < literals.length; i++) {
+ if (predicate == null) {
+ predicate = builder.equal(i, literals[i]);
+ } else {
+ predicate = PredicateBuilder.and(predicate, builder.equal(i, literals[i]));
+ }
+ }
+
+ return predicate;
+ }
}
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 03fd1a319..42c4af6dc 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -45,7 +45,7 @@ public class DataFormatTestUtil {
}
/** Stringify the given {@link InternalRow}. */
- public static String rowDataToString(InternalRow row, RowType type) {
+ public static String internalRowToString(InternalRow row, RowType type) {
return row.getRowKind().shortString() + "[" + toStringNoRowKind(row, type) + ']';
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
similarity index 100%
rename from paimon-core/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
rename to paimon-common/src/test/java/org/apache/paimon/predicate/PredicateBuilderTest.java
diff --git a/paimon-core/src/test/java/org/apache/paimon/predicate/PredicateTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateTest.java
similarity index 100%
rename from paimon-core/src/test/java/org/apache/paimon/predicate/PredicateTest.java
rename to paimon-common/src/test/java/org/apache/paimon/predicate/PredicateTest.java
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index f644129a9..20fde8b42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -115,6 +115,7 @@ public abstract class AbstractFileStore<T> implements FileStore<T> {
options.bucket(),
options.manifestTargetSize(),
options.manifestMergeMinCount(),
+ partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index b88c184c5..9f804cefa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -413,6 +413,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to read the changes from overwrite in streaming mode.");
+ public static final ConfigOption<Boolean> DYNAMIC_PARTITION_OVERWRITE =
+ key("dynamic-partition-overwrite")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether only overwrite dynamic partition when overwriting a partitioned table with "
+ + "dynamic partition columns. Works only when the table has partition keys.");
+
public static final ConfigOption<Duration> PARTITION_EXPIRATION_TIME =
key("partition.expiration-time")
.durationType()
@@ -764,6 +772,10 @@ public class CoreOptions implements Serializable {
return options.get(STREAMING_READ_OVERWRITE);
}
+ public boolean dynamicPartitionOverwrite() {
+ return options.get(DYNAMIC_PARTITION_OVERWRITE);
+ }
+
public Duration partitionExpireTime() {
return options.get(PARTITION_EXPIRATION_TIME);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 2c114b830..a60488f4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -55,8 +55,6 @@ public interface FileStoreCommit {
/**
* Overwrite from manifest committable and partition.
*
- * <p>TODO: The method's semantics can be dynamic or static overwrite according to properties.
- *
* @param partition A single partition maps each partition key to a partition value. Depending
* on the user-defined statement, the partition might not include all partition keys. Also
* note that this partition does not necessarily equal to the partitions of the newly added
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a7f7d7e38..bc9b57d4c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -100,6 +100,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final int numBucket;
private final MemorySize manifestTargetSize;
private final int manifestMergeMinCount;
+ private final boolean dynamicPartitionOverwrite;
@Nullable private final Comparator<InternalRow> keyComparator;
@Nullable private Lock lock;
@@ -118,6 +119,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
int numBucket,
MemorySize manifestTargetSize,
int manifestMergeMinCount,
+ boolean dynamicPartitionOverwrite,
@Nullable Comparator<InternalRow> keyComparator) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -132,6 +134,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
this.manifestMergeMinCount = manifestMergeMinCount;
+ this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
this.lock = null;
@@ -253,10 +256,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
Map<String, String> properties) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Ready to overwrite partition "
- + partition.toString()
- + "\n"
- + committable.toString());
+ "Ready to overwrite partition {}\nManifestCommittable: {}\nProperties: {}",
+ partition,
+ committable,
+ properties);
}
List<ManifestEntry> appendTableFiles = new ArrayList<>();
@@ -286,28 +289,53 @@ public class FileStoreCommitImpl implements FileStoreCommit {
LOG.warn(warnMessage.toString());
}
- // sanity check, all changes must be done within the given partition
- Predicate partitionFilter = PredicateBuilder.partition(partition, partitionType);
- if (partitionFilter != null) {
- for (ManifestEntry entry : appendTableFiles) {
- if (!partitionFilter.test(partitionObjectConverter.convert(entry.partition()))) {
- throw new IllegalArgumentException(
- "Trying to overwrite partition "
- + partition
- + ", but the changes in "
- + pathFactory.getPartitionString(entry.partition())
- + " does not belong to this partition");
+ boolean skipOverwrite = false;
+ // partition filter is built from static or dynamic partition according to properties
+ Predicate partitionFilter = null;
+ if (dynamicPartitionOverwrite) {
+ if (appendTableFiles.isEmpty()) {
+ // in dynamic mode, if there is no changes to commit, no data will be deleted
+ skipOverwrite = true;
+ } else {
+ partitionFilter =
+ appendTableFiles.stream()
+ .map(ManifestEntry::partition)
+ .distinct()
+ // partition filter is built from new data's partitions
+ .map(p -> PredicateBuilder.equalPartition(p, partitionType))
+ .reduce(PredicateBuilder::or)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Failed to get dynamic partition filter. This is unexpected."));
+ }
+ } else {
+ partitionFilter = PredicateBuilder.partition(partition, partitionType);
+ // sanity check, all changes must be done within the given partition
+ if (partitionFilter != null) {
+ for (ManifestEntry entry : appendTableFiles) {
+ if (!partitionFilter.test(
+ partitionObjectConverter.convert(entry.partition()))) {
+ throw new IllegalArgumentException(
+ "Trying to overwrite partition "
+ + partition
+ + ", but the changes in "
+ + pathFactory.getPartitionString(entry.partition())
+ + " does not belong to this partition");
+ }
}
}
}
// overwrite new files
- tryOverwrite(
- partitionFilter,
- appendTableFiles,
- committable.identifier(),
- committable.watermark(),
- committable.logOffsets());
+ if (!skipOverwrite) {
+ tryOverwrite(
+ partitionFilter,
+ appendTableFiles,
+ committable.identifier(),
+ committable.watermark(),
+ committable.logOffsets());
+ }
if (!compactTableFiles.isEmpty()) {
tryCommit(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 0236e27ad..91af65175 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -100,7 +100,7 @@ public class TableCommitImpl implements InnerTableCommit {
if (overwritePartition == null) {
commit.commit(committable, new HashMap<>());
} else {
- commit.overwrite(overwritePartition, committable, new HashMap<>());
+ commit.overwrite(overwritePartition, committable, Collections.emptyMap());
}
expire();
}
@@ -128,7 +128,7 @@ public class TableCommitImpl implements InnerTableCommit {
// TODO maybe it can be produced by CommitterOperator
committable = new ManifestCommittable(Long.MAX_VALUE);
}
- commit.overwrite(overwritePartition, committable, new HashMap<>());
+ commit.overwrite(overwritePartition, committable, Collections.emptyMap());
}
expire();
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b6750d5d0..9c860928f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -556,6 +556,9 @@ public class TestFileStore extends KeyValueFileStore {
conf.set(CoreOptions.CHANGELOG_PRODUCER, changelogProducer);
+ // disable dynamic-partition-overwrite in FileStoreCommit layer test
+ conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
+
return new TestFileStore(
root,
new CoreOptions(conf),
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index a2ed61231..07e74a573 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -303,4 +303,21 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
""));
return new AppendOnlyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
}
+
+ @Override
+ protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ OVERWRITE_TEST_ROW_TYPE.getFields(),
+ Arrays.asList("pt0", "pt1"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
index 38d1f1b2c..6953b36f3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogValueCountFileStoreTableTest.java
@@ -225,4 +225,21 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
""));
return new ChangelogValueCountFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
}
+
+ @Override
+ protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ OVERWRITE_TEST_ROW_TYPE.getFields(),
+ Arrays.asList("pt0", "pt1"),
+ Arrays.asList("pk", "pt0", "pt1"),
+ conf.toMap(),
+ ""));
+ return new ChangelogValueCountFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 61ece3833..4dc4298d3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -66,7 +66,7 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
-import static org.apache.paimon.data.DataFormatTestUtil.rowDataToString;
+import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -692,7 +692,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
AuditLogTable auditLogTable = new AuditLogTable(table);
Function<InternalRow, String> rowDataToString =
row ->
- rowDataToString(
+ internalRowToString(
row,
DataTypes.ROW(
DataTypes.STRING(),
@@ -731,7 +731,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
read = auditLogTable.newRead().withProjection(new int[] {2, 0, 1});
Function<InternalRow, String> projectToString1 =
row ->
- rowDataToString(
+ internalRowToString(
row,
DataTypes.ROW(
DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()));
@@ -743,7 +743,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
snapshotSplitReader = auditLogTable.newSnapshotSplitReader();
read = auditLogTable.newRead().withProjection(new int[] {2, 1});
Function<InternalRow, String> projectToString2 =
- row -> rowDataToString(row, DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
+ row -> internalRowToString(row, DataTypes.ROW(DataTypes.INT(), DataTypes.INT()));
result = getResult(read, toSplits(snapshotSplitReader.splits()), projectToString2);
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}
@@ -765,7 +765,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
options.set("fields.c.ignore-retract", "true");
},
rowType);
- Function<InternalRow, String> rowToString = row -> rowDataToString(row, rowType);
+ Function<InternalRow, String> rowToString = row -> internalRowToString(row, rowType);
SnapshotSplitReader snapshotSplitReader = table.newSnapshotSplitReader();
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
@@ -821,6 +821,23 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
return createFileStoreTable(configure, ROW_TYPE);
}
+ @Override
+ protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ OVERWRITE_TEST_ROW_TYPE.getFields(),
+ Arrays.asList("pt0", "pt1"),
+ Arrays.asList("pk", "pt0", "pt1"),
+ conf.toMap(),
+ ""));
+ return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, tableSchema);
+ }
+
private FileStoreTable createFileStoreTable(Consumer<Options> configure, RowType rowType)
throws Exception {
Options conf = new Options();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 39da2b48d..b5ea597bb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -57,6 +58,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -79,6 +83,7 @@ import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/** Base test class for {@link FileStoreTable}. */
public abstract class FileStoreTableTestBase {
@@ -95,6 +100,15 @@ public abstract class FileStoreTableTestBase {
DataTypes.MULTISET(DataTypes.VARCHAR(8))
},
new String[] {"pt", "a", "b", "c", "d", "e", "f"});
+
+ // for overwrite test
+ protected static final RowType OVERWRITE_TEST_ROW_TYPE =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()
+ },
+ new String[] {"pk", "pt0", "pt1", "v"});
+
protected static final int[] PROJECTION = new int[] {2, 1};
protected static final Function<InternalRow, String> BATCH_ROW_TO_STRING =
rowData ->
@@ -192,6 +206,62 @@ public abstract class FileStoreTableTestBase {
"2|22|222|binary|varbinary|mapKey:mapVal|multiset");
}
+ @ParameterizedTest(name = "dynamic = {0}, partition={2}")
+ @MethodSource("overwriteTestData")
+ public void testOverwriteNothing(
+ boolean dynamicPartitionOverwrite,
+ List<InternalRow> overwriteData,
+ Map<String, String> overwritePartition,
+ List<String> expected)
+ throws Exception {
+ FileStoreTable table = overwriteTestFileStoreTable();
+ if (!dynamicPartitionOverwrite) {
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"));
+ }
+
+ // prepare data
+ // (1, 1, 'A', 'Hi'), (2, 1, 'A', 'Hello'), (3, 1, 'A', 'World'),
+ // (4, 1, 'B', 'To'), (5, 1, 'B', 'Apache'), (6, 1, 'B', 'Paimon')
+ // (7, 2, 'A', 'Test')
+ // (8, 2, 'B', 'Case')
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ InnerTableCommit commit = table.newCommit(commitUser)) {
+ write.write(overwriteRow(1, 1, "A", "Hi"));
+ write.write(overwriteRow(2, 1, "A", "Hello"));
+ write.write(overwriteRow(3, 1, "A", "World"));
+ write.write(overwriteRow(4, 1, "B", "To"));
+ write.write(overwriteRow(5, 1, "B", "Apache"));
+ write.write(overwriteRow(6, 1, "B", "Paimon"));
+ write.write(overwriteRow(7, 2, "A", "Test"));
+ write.write(overwriteRow(8, 2, "B", "Case"));
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ // overwrite data
+ try (StreamTableWrite write = table.newWrite(commitUser).withOverwrite(true);
+ InnerTableCommit commit = table.newCommit(commitUser)) {
+ for (InternalRow row : overwriteData) {
+ write.write(row);
+ }
+ commit.withOverwrite(overwritePartition).commit(1, write.prepareCommit(true, 1));
+ }
+
+ // validate
+ List<Split> splits = toSplits(table.newSnapshotSplitReader().splits());
+ TableRead read = table.newRead();
+ assertThat(
+ getResult(
+ read,
+ splits,
+ row ->
+ DataFormatTestUtil.toStringNoRowKind(
+ row, OVERWRITE_TEST_ROW_TYPE)))
+ .hasSameElementsAs(expected);
+ }
+
@Test
public void testOverwrite() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -497,6 +567,96 @@ public abstract class FileStoreTableTestBase {
protected abstract FileStoreTable createFileStoreTable(Consumer<Options> configure)
throws Exception;
+ protected abstract FileStoreTable overwriteTestFileStoreTable() throws Exception;
+
+ private static InternalRow overwriteRow(Object... values) {
+ return GenericRow.of(
+ values[0],
+ values[1],
+ BinaryString.fromString((String) values[2]),
+ BinaryString.fromString((String) values[3]));
+ }
+
+ private static List<Arguments> overwriteTestData() {
+ // dynamic, overwrite data, overwrite partition, expected
+ return Arrays.asList(
+ // nothing happen
+ arguments(
+ true,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "1, 1, A, Hi",
+ "2, 1, A, Hello",
+ "3, 1, A, World",
+ "4, 1, B, To",
+ "5, 1, B, Apache",
+ "6, 1, B, Paimon",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ // delete all data
+ arguments(
+ false,
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ Collections.emptyList()),
+ // specify one partition key
+ arguments(
+ true,
+ Arrays.asList(
+ overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")),
+ Collections.singletonMap("pt0", "1"),
+ Arrays.asList(
+ "1, 1, A, Where",
+ "2, 1, A, When",
+ "4, 1, B, To",
+ "5, 1, B, Apache",
+ "6, 1, B, Paimon",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ arguments(
+ false,
+ Arrays.asList(
+ overwriteRow(1, 1, "A", "Where"), overwriteRow(2, 1, "A", "When")),
+ Collections.singletonMap("pt0", "1"),
+ Arrays.asList(
+ "1, 1, A, Where",
+ "2, 1, A, When",
+ "7, 2, A, Test",
+ "8, 2, B, Case")),
+ // all dynamic
+ arguments(
+ true,
+ Arrays.asList(
+ overwriteRow(4, 1, "B", "Where"),
+ overwriteRow(5, 1, "B", "When"),
+ overwriteRow(10, 2, "A", "Static"),
+ overwriteRow(11, 2, "A", "Dynamic")),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "1, 1, A, Hi",
+ "2, 1, A, Hello",
+ "3, 1, A, World",
+ "4, 1, B, Where",
+ "5, 1, B, When",
+ "10, 2, A, Static",
+ "11, 2, A, Dynamic",
+ "8, 2, B, Case")),
+ arguments(
+ false,
+ Arrays.asList(
+ overwriteRow(4, 1, "B", "Where"),
+ overwriteRow(5, 1, "B", "When"),
+ overwriteRow(10, 2, "A", "Static"),
+ overwriteRow(11, 2, "A", "Dynamic")),
+ Collections.emptyMap(),
+ Arrays.asList(
+ "4, 1, B, Where",
+ "5, 1, B, When",
+ "10, 2, A, Static",
+ "11, 2, A, Dynamic")));
+ }
+
protected List<Split> toSplits(List<DataSplit> dataSplits) {
return new ArrayList<>(dataSplits);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
index 188dc6238..387158148 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
@@ -108,4 +108,26 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
""));
return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, schema);
}
+
+ @Override
+ protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ // Run with minimal memory to ensure a more intense preempt
+ // Currently a writer needs at least one page
+ int pages = 10;
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
+ TableSchema schema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ OVERWRITE_TEST_ROW_TYPE.getFields(),
+ Arrays.asList("pt0", "pt1"),
+ Arrays.asList("pk", "pt0", "pt1"),
+ conf.toMap(),
+ ""));
+ return new ChangelogWithKeyFileStoreTable(LocalFileIO.create(), tablePath, schema);
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
index 39a35dd4d..2eb90ba2e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.flink.types.Row;
@@ -27,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
@@ -44,6 +46,9 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
/** Paimon IT case when the table has composite primary keys and multiple partition fields. */
public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase {
+ private final Map<String, String> staticPartitionOverwrite =
+ Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
+
@BeforeEach
public void setUp() {
init(getTempDirPath());
@@ -277,7 +282,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase
"rate_by_to_currency DOUBLE",
"dt STRING"),
Arrays.asList("from_currency", "to_currency", "dt"),
- Collections.singletonList("dt"));
+ Collections.singletonList("dt"),
+ staticPartitionOverwrite);
insertIntoFromTable(temporaryTable, table);
@@ -595,7 +601,8 @@ public class CompositePkAndMultiPartitionedTableITCase extends AbstractTestBase
createTable(
Arrays.asList("currency STRING", "rate BIGINT", "dt STRING", "hh STRING"),
Collections.emptyList(),
- Arrays.asList("dt", "hh"));
+ Arrays.asList("dt", "hh"),
+ staticPartitionOverwrite);
insertIntoFromTable(temporaryTable, table);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
index 5e1e7a123..0c09bd2a4 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
@@ -56,8 +56,8 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
public class CompositePkAndMultiPartitionedTableWIthKafkaLogITCase extends KafkaTableTestBase {
@BeforeEach
- public void setUp() throws Exception {
- init(createAndRegisterTempFile("").toString());
+ public void setUp() {
+ init(getTempDirPath());
}
// ----------------------------------------------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 05d83e621..8b5fba73a 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -61,7 +61,6 @@ import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -121,7 +120,7 @@ public class FileStoreITCase extends AbstractTestBase {
private final StreamExecutionEnvironment env;
- public FileStoreITCase(boolean isBatch) throws IOException {
+ public FileStoreITCase(boolean isBatch) {
this.isBatch = isBatch;
this.env = isBatch ? buildBatchEnv() : buildStreamEnv();
}
@@ -203,7 +202,7 @@ public class FileStoreITCase extends AbstractTestBase {
Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
- // overwrite all
+ // test dynamic partition overwrite
partialData =
env.fromCollection(
Collections.singletonList(
@@ -217,7 +216,27 @@ public class FileStoreITCase extends AbstractTestBase {
// read
results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build());
- expected = new Row[] {Row.of(19, "p2", 6)};
+ expected = new Row[] {Row.of(19, "p2", 6), Row.of(5, "p1", 1), Row.of(0, "p1", 2)};
+ assertThat(results).containsExactlyInAnyOrder(expected);
+
+ // test static overwrite all
+ partialData =
+ env.fromCollection(
+ Collections.singletonList(
+ wrap(GenericRowData.of(20, StringData.fromString("p2"), 3))),
+ InternalTypeInfo.of(TABLE_TYPE));
+ new FlinkSinkBuilder(
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false")))
+ .withInput(partialData)
+ .withOverwritePartition(new HashMap<>())
+ .build();
+ env.execute();
+
+ // read
+ results = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, table).withEnv(env).build());
+ expected = new Row[] {Row.of(20, "p2", 3)};
assertThat(results).containsExactlyInAnyOrder(expected);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 57d9fb118..cfe5f9f35 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -61,6 +61,7 @@ import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bExeEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
@@ -85,11 +86,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class ReadWriteTableITCase extends AbstractTestBase {
private final Map<String, String> streamingReadOverwrite =
- new HashMap<String, String>() {
- {
- put(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
- }
- };
+ Collections.singletonMap(CoreOptions.STREAMING_READ_OVERWRITE.key(), "true");
+
+ private final Map<String, String> staticPartitionOverwrite =
+ Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
@BeforeEach
public void setUp() {
@@ -660,7 +660,76 @@ public class ReadWriteTableITCase extends AbstractTestBase {
}
// ----------------------------------------------------------------------------------------------------------------
- // Purge data using overwrite
+ // Dynamic partition overwrite (default option)
+ // ----------------------------------------------------------------------------------------------------------------
+
+ @Test
+ public void testDynamicOverwrite() throws Exception {
+ String table =
+ createTable(
+ Arrays.asList("pk INT", "part0 INT", "part1 STRING", "v STRING"),
+ Arrays.asList("pk", "part0", "part1"),
+ Arrays.asList("part0", "part1"),
+ streamingReadOverwrite);
+
+ insertInto(
+ table,
+ "(1, 1, 'A', 'Hi')",
+ "(2, 1, 'A', 'Hello')",
+ "(3, 1, 'A', 'World')",
+ "(4, 1, 'B', 'To')",
+ "(5, 1, 'B', 'Apache')",
+ "(6, 1, 'B', 'Paimon')",
+ "(7, 2, 'A', 'Test')",
+ "(8, 2, 'B', 'Case')");
+
+ BlockingIterator<Row, Row> streamItr =
+ testStreamingRead(
+ buildSimpleQuery(table),
+ Arrays.asList(
+ changelogRow("+I", 1, 1, "A", "Hi"),
+ changelogRow("+I", 2, 1, "A", "Hello"),
+ changelogRow("+I", 3, 1, "A", "World"),
+ changelogRow("+I", 4, 1, "B", "To"),
+ changelogRow("+I", 5, 1, "B", "Apache"),
+ changelogRow("+I", 6, 1, "B", "Paimon"),
+ changelogRow("+I", 7, 2, "A", "Test"),
+ changelogRow("+I", 8, 2, "B", "Case")));
+
+ bEnv.executeSql(
+ String.format(
+ "INSERT OVERWRITE `%s` VALUES (4, 1, 'B', 'Where'), (5, 1, 'B', 'When'), (10, 2, 'A', 'Static'), (11, 2, 'A', 'Dynamic')",
+ table))
+ .await();
+
+ assertThat(streamItr.collect(8))
+ .containsExactlyInAnyOrder(
+ changelogRow("-D", 4, 1, "B", "To"),
+ changelogRow("-D", 5, 1, "B", "Apache"),
+ changelogRow("-D", 6, 1, "B", "Paimon"),
+ changelogRow("-D", 7, 2, "A", "Test"),
+ changelogRow("+I", 4, 1, "B", "Where"),
+ changelogRow("+I", 5, 1, "B", "When"),
+ changelogRow("+I", 10, 2, "A", "Static"),
+ changelogRow("+I", 11, 2, "A", "Dynamic"));
+ assertNoMoreRecords(streamItr);
+ streamItr.close();
+
+ testBatchRead(
+ buildSimpleQuery(table),
+ Arrays.asList(
+ changelogRow("+I", 1, 1, "A", "Hi"),
+ changelogRow("+I", 2, 1, "A", "Hello"),
+ changelogRow("+I", 3, 1, "A", "World"),
+ changelogRow("+I", 4, 1, "B", "Where"),
+ changelogRow("+I", 5, 1, "B", "When"),
+ changelogRow("+I", 10, 2, "A", "Static"),
+ changelogRow("+I", 11, 2, "A", "Dynamic"),
+ changelogRow("+I", 8, 2, "B", "Case")));
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Purge data using overwrite (NOTE: set overwrite.dynamic-partition = false)
// ----------------------------------------------------------------------------------------------------------------
@Test
@@ -669,9 +738,10 @@ public class ReadWriteTableITCase extends AbstractTestBase {
createTable(
Arrays.asList("k0 INT", "k1 STRING", "v STRING"),
Collections.emptyList(),
- Collections.emptyList());
+ Collections.emptyList(),
+ staticPartitionOverwrite);
- validateOverwriteResult(table, "", "*", Collections.emptyList());
+ validatePurgingResult(table, "", "*", Collections.emptyList());
}
@Test
@@ -680,9 +750,13 @@ public class ReadWriteTableITCase extends AbstractTestBase {
// single partition key
String table =
- createTable(fieldsSpec, Collections.emptyList(), Collections.singletonList("k0"));
+ createTable(
+ fieldsSpec,
+ Collections.emptyList(),
+ Collections.singletonList("k0"),
+ staticPartitionOverwrite);
- validateOverwriteResult(
+ validatePurgingResult(
table,
"PARTITION (k0 = 0)",
"k1, v",
@@ -692,9 +766,14 @@ public class ReadWriteTableITCase extends AbstractTestBase {
changelogRow("+I", 1, "2023-01-02", "store")));
// multiple partition keys and overwrite one partition key
- table = createTable(fieldsSpec, Collections.emptyList(), Arrays.asList("k0", "k1"));
+ table =
+ createTable(
+ fieldsSpec,
+ Collections.emptyList(),
+ Arrays.asList("k0", "k1"),
+ staticPartitionOverwrite);
- validateOverwriteResult(
+ validatePurgingResult(
table,
"PARTITION (k0 = 0)",
"k1, v",
@@ -704,9 +783,14 @@ public class ReadWriteTableITCase extends AbstractTestBase {
changelogRow("+I", 1, "2023-01-02", "store")));
// multiple partition keys and overwrite all partition keys
- table = createTable(fieldsSpec, Collections.emptyList(), Arrays.asList("k0", "k1"));
+ table =
+ createTable(
+ fieldsSpec,
+ Collections.emptyList(),
+ Arrays.asList("k0", "k1"),
+ staticPartitionOverwrite);
- validateOverwriteResult(
+ validatePurgingResult(
table,
"PARTITION (k0 = 0, k1 = '2023-01-01')",
"v",
@@ -1085,7 +1169,7 @@ public class ReadWriteTableITCase extends AbstractTestBase {
// Tools
// ----------------------------------------------------------------------------------------------------------------
- private void validateOverwriteResult(
+ private void validatePurgingResult(
String table, String partitionSpec, String projectionSpec, List<Row> expected)
throws Exception {
insertInto(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 5d714bfa3..ee1d010a3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -112,7 +112,7 @@ public class ActionITCaseBase extends AbstractTestBase {
RecordReader<InternalRow> recordReader = read.createReader(splits);
List<String> result = new ArrayList<>();
recordReader.forEachRemaining(
- row -> result.add(DataFormatTestUtil.rowDataToString(row, rowType)));
+ row -> result.add(DataFormatTestUtil.internalRowToString(row, rowType)));
return result;
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 270ee48c3..8eba48b1e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -148,6 +148,7 @@ public class ReadWriteTableTestUtil {
put(LOG_SYSTEM.key(), "kafka");
put(BOOTSTRAP_SERVERS.key(), getBootstrapServers());
put(TOPIC.key(), topic);
+ put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
}
});