You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/18 22:17:04 UTC
[iceberg] branch master updated: Core: Allow one data writer in BasePositionDeltaWriter (#7648)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4554938912 Core: Allow one data writer in BasePositionDeltaWriter (#7648)
4554938912 is described below
commit 45549389121ec5fb76e96670219307e5783593fc
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 18 15:16:58 2023 -0700
Core: Allow one data writer in BasePositionDeltaWriter (#7648)
---
.../apache/iceberg/io/BasePositionDeltaWriter.java | 26 +++++++++++++++++-----
.../iceberg/io/TestPositionDeltaWriters.java | 25 +++++++++++++++++++++
2 files changed, 45 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
index d54a5e05a7..e098729ba2 100644
--- a/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
@@ -19,10 +19,12 @@
package org.apache.iceberg.io;
import java.io.IOException;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
@@ -33,14 +35,18 @@ public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
private boolean closed;
+ public BasePositionDeltaWriter(
+ PartitioningWriter<T, DataWriteResult> dataWriter,
+ PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter) {
+ this(dataWriter, dataWriter, deleteWriter);
+ }
+
public BasePositionDeltaWriter(
PartitioningWriter<T, DataWriteResult> insertWriter,
PartitioningWriter<T, DataWriteResult> updateWriter,
PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter) {
Preconditions.checkArgument(insertWriter != null, "Insert writer cannot be null");
Preconditions.checkArgument(updateWriter != null, "Update writer cannot be null");
- Preconditions.checkArgument(
- insertWriter != updateWriter, "Update and insert writers must be different");
Preconditions.checkArgument(deleteWriter != null, "Delete writer cannot be null");
this.insertWriter = insertWriter;
@@ -69,18 +75,26 @@ public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
public WriteResult result() {
Preconditions.checkState(closed, "Cannot get result from unclosed writer");
- DataWriteResult insertWriteResult = insertWriter.result();
- DataWriteResult updateWriteResult = updateWriter.result();
DeleteWriteResult deleteWriteResult = deleteWriter.result();
return WriteResult.builder()
- .addDataFiles(insertWriteResult.dataFiles())
- .addDataFiles(updateWriteResult.dataFiles())
+ .addDataFiles(dataFiles())
.addDeleteFiles(deleteWriteResult.deleteFiles())
.addReferencedDataFiles(deleteWriteResult.referencedDataFiles())
.build();
}
+ private Iterable<DataFile> dataFiles() {
+ if (insertWriter == updateWriter) {
+ DataWriteResult result = insertWriter.result();
+ return result.dataFiles();
+ } else {
+ DataWriteResult insertWriteResult = insertWriter.result();
+ DataWriteResult updateWriteResult = updateWriter.result();
+ return Iterables.concat(insertWriteResult.dataFiles(), updateWriteResult.dataFiles());
+ }
+ }
+
@Override
public void close() throws IOException {
if (!closed) {
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
index f1295296b9..aa1118306a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
@@ -75,6 +75,31 @@ public abstract class TestPositionDeltaWriters<T> extends WriterTestBase<T> {
this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
}
+ @Test
+ public void testPositionDeltaWithOneDataWriter() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ ClusteredDataWriter<T> dataWriter =
+ new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ ClusteredPositionDeleteWriter<T> deleteWriter =
+ new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+ PositionDeltaWriter<T> deltaWriter = new BasePositionDeltaWriter<>(dataWriter, deleteWriter);
+
+ deltaWriter.insert(toRow(1, "insert"), table.spec(), null);
+ deltaWriter.update(toRow(2, "update"), table.spec(), null);
+ deltaWriter.close();
+
+ WriteResult result = deltaWriter.result();
+ DataFile[] dataFiles = result.dataFiles();
+ DeleteFile[] deleteFiles = result.deleteFiles();
+ CharSequence[] referencedDataFiles = result.referencedDataFiles();
+
+ Assert.assertEquals("Must be 1 data files", 1, dataFiles.length);
+ Assert.assertEquals("Must be no delete files", 0, deleteFiles.length);
+ Assert.assertEquals("Must not reference data files", 0, referencedDataFiles.length);
+ }
+
@Test
public void testPositionDeltaInsertOnly() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());