You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/18 22:17:04 UTC

[iceberg] branch master updated: Core: Allow one data writer in BasePositionDeltaWriter (#7648)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4554938912 Core: Allow one data writer in BasePositionDeltaWriter (#7648)
4554938912 is described below

commit 45549389121ec5fb76e96670219307e5783593fc
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu May 18 15:16:58 2023 -0700

    Core: Allow one data writer in BasePositionDeltaWriter (#7648)
---
 .../apache/iceberg/io/BasePositionDeltaWriter.java | 26 +++++++++++++++++-----
 .../iceberg/io/TestPositionDeltaWriters.java       | 25 +++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
index d54a5e05a7..e098729ba2 100644
--- a/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.io;
 
 import java.io.IOException;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
 public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
 
@@ -33,14 +35,18 @@ public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
 
   private boolean closed;
 
+  public BasePositionDeltaWriter(
+      PartitioningWriter<T, DataWriteResult> dataWriter,
+      PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter) {
+    this(dataWriter, dataWriter, deleteWriter);
+  }
+
   public BasePositionDeltaWriter(
       PartitioningWriter<T, DataWriteResult> insertWriter,
       PartitioningWriter<T, DataWriteResult> updateWriter,
       PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter) {
     Preconditions.checkArgument(insertWriter != null, "Insert writer cannot be null");
     Preconditions.checkArgument(updateWriter != null, "Update writer cannot be null");
-    Preconditions.checkArgument(
-        insertWriter != updateWriter, "Update and insert writers must be different");
     Preconditions.checkArgument(deleteWriter != null, "Delete writer cannot be null");
 
     this.insertWriter = insertWriter;
@@ -69,18 +75,26 @@ public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
   public WriteResult result() {
     Preconditions.checkState(closed, "Cannot get result from unclosed writer");
 
-    DataWriteResult insertWriteResult = insertWriter.result();
-    DataWriteResult updateWriteResult = updateWriter.result();
     DeleteWriteResult deleteWriteResult = deleteWriter.result();
 
     return WriteResult.builder()
-        .addDataFiles(insertWriteResult.dataFiles())
-        .addDataFiles(updateWriteResult.dataFiles())
+        .addDataFiles(dataFiles())
         .addDeleteFiles(deleteWriteResult.deleteFiles())
         .addReferencedDataFiles(deleteWriteResult.referencedDataFiles())
         .build();
   }
 
+  private Iterable<DataFile> dataFiles() {
+    if (insertWriter == updateWriter) {
+      DataWriteResult result = insertWriter.result();
+      return result.dataFiles();
+    } else {
+      DataWriteResult insertWriteResult = insertWriter.result();
+      DataWriteResult updateWriteResult = updateWriter.result();
+      return Iterables.concat(insertWriteResult.dataFiles(), updateWriteResult.dataFiles());
+    }
+  }
+
   @Override
   public void close() throws IOException {
     if (!closed) {
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
index f1295296b9..aa1118306a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
@@ -75,6 +75,31 @@ public abstract class TestPositionDeltaWriters<T> extends WriterTestBase<T> {
     this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
   }
 
+  @Test
+  public void testPositionDeltaWithOneDataWriter() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    ClusteredDataWriter<T> dataWriter =
+        new ClusteredDataWriter<>(writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+    ClusteredPositionDeleteWriter<T> deleteWriter =
+        new ClusteredPositionDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+    PositionDeltaWriter<T> deltaWriter = new BasePositionDeltaWriter<>(dataWriter, deleteWriter);
+
+    deltaWriter.insert(toRow(1, "insert"), table.spec(), null);
+    deltaWriter.update(toRow(2, "update"), table.spec(), null);
+    deltaWriter.close();
+
+    WriteResult result = deltaWriter.result();
+    DataFile[] dataFiles = result.dataFiles();
+    DeleteFile[] deleteFiles = result.deleteFiles();
+    CharSequence[] referencedDataFiles = result.referencedDataFiles();
+
+    Assert.assertEquals("Must be 1 data files", 1, dataFiles.length);
+    Assert.assertEquals("Must be no delete files", 0, deleteFiles.length);
+    Assert.assertEquals("Must not reference data files", 0, referencedDataFiles.length);
+  }
+
   @Test
   public void testPositionDeltaInsertOnly() throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());