You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/20 22:13:56 UTC

[iceberg] branch master updated: Core: Add new rolling file writers (#3158)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eeeada  Core: Add new rolling file writers (#3158)
7eeeada is described below

commit 7eeeada0dc027025664310c1ca619e9b44fec764
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Sep 20 15:13:46 2021 -0700

    Core: Add new rolling file writers (#3158)
---
 .../org/apache/iceberg/io/DeleteWriteResult.java   |   4 +
 .../org/apache/iceberg/io/RollingDataWriter.java   |  61 ++++++
 .../iceberg/io/RollingEqualityDeleteWriter.java    |  64 ++++++
 .../org/apache/iceberg/io/RollingFileWriter.java   | 143 ++++++++++++
 .../iceberg/io/RollingPositionDeleteWriter.java    |  68 ++++++
 .../apache/iceberg/io/TestRollingFileWriters.java  | 243 +++++++++++++++++++++
 .../flink/sink/TestFlinkRollingFileWriters.java    |  53 +++++
 .../spark/source/TestSparkRollingFileWriters.java  |  57 +++++
 8 files changed, 693 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
index e8fe793..3d59e0c 100644
--- a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
+++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
@@ -61,4 +61,8 @@ public class DeleteWriteResult {
   public CharSequenceSet referencedDataFiles() {
     return referencedDataFiles;
   }
+
+  public boolean referencesDataFiles() {
+    return referencedDataFiles != null && referencedDataFiles.size() > 0;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java
new file mode 100644
index 0000000..1642a30
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling data writer that splits incoming data into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingDataWriter<T> extends RollingFileWriter<T, DataWriter<T>, DataWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final List<DataFile> dataFiles;
+
+  public RollingDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                           FileIO io, long targetFileSizeInBytes,
+                           PartitionSpec spec, StructLike partition) {
+    super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+    this.writerFactory = writerFactory;
+    this.dataFiles = Lists.newArrayList();
+    openCurrentWriter();
+  }
+
+  @Override
+  protected DataWriter<T> newWriter(EncryptedOutputFile file) {
+    return writerFactory.newDataWriter(file, spec(), partition());
+  }
+
+  @Override
+  protected void addResult(DataWriteResult result) {
+    dataFiles.addAll(result.dataFiles());
+  }
+
+  @Override
+  protected DataWriteResult aggregatedResult() {
+    return new DataWriteResult(dataFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
new file mode 100644
index 0000000..c12bfd3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingEqualityDeleteWriter<T> extends RollingFileWriter<T, EqualityDeleteWriter<T>, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final List<DeleteFile> deleteFiles;
+
+  public RollingEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                     FileIO io, long targetFileSizeInBytes,
+                                     PartitionSpec spec, StructLike partition) {
+    super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+    this.writerFactory = writerFactory;
+    this.deleteFiles = Lists.newArrayList();
+    openCurrentWriter();
+  }
+
+  @Override
+  protected EqualityDeleteWriter<T> newWriter(EncryptedOutputFile file) {
+    return writerFactory.newEqualityDeleteWriter(file, spec(), partition());
+  }
+
+  @Override
+  protected void addResult(DeleteWriteResult result) {
+    Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
+    deleteFiles.addAll(result.deleteFiles());
+  }
+
+  @Override
+  protected DeleteWriteResult aggregatedResult() {
+    return new DeleteWriteResult(deleteFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
new file mode 100644
index 0000000..ed35933
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data or deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+  private static final int ROWS_DIVISOR = 1000;
+
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSizeInBytes;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+
+  private EncryptedOutputFile currentFile = null;
+  private long currentFileRows = 0;
+  private W currentWriter = null;
+
+  private boolean closed = false;
+
+  protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+                              PartitionSpec spec, StructLike partition) {
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.spec = spec;
+    this.partition = partition;
+  }
+
+  protected abstract W newWriter(EncryptedOutputFile file);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  protected PartitionSpec spec() {
+    return spec;
+  }
+
+  protected StructLike partition() {
+    return partition;
+  }
+
+  public CharSequence currentFilePath() {
+    return currentFile.encryptingOutputFile().location();
+  }
+
+  public long currentFileRows() {
+    return currentFileRows;
+  }
+
+  @Override
+  public long length() {
+    throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
+  }
+
+  @Override
+  public void write(T row) throws IOException {
+    currentWriter.write(row);
+    currentFileRows++;
+
+    if (shouldRollToNewFile()) {
+      closeCurrentWriter();
+      openCurrentWriter();
+    }
+  }
+
+  private boolean shouldRollToNewFile() {
+    return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
+  }
+
+  protected void openCurrentWriter() {
+    Preconditions.checkState(currentWriter == null, "Current writer has been already initialized");
+
+    this.currentFile = newFile();
+    this.currentFileRows = 0;
+    this.currentWriter = newWriter(currentFile);
+  }
+
+  private EncryptedOutputFile newFile() {
+    if (partition == null) {
+      return fileFactory.newOutputFile();
+    } else {
+      return fileFactory.newOutputFile(spec, partition);
+    }
+  }
+
+  private void closeCurrentWriter() throws IOException {
+    if (currentWriter != null) {
+      currentWriter.close();
+
+      if (currentFileRows == 0L) {
+        io.deleteFile(currentFile.encryptingOutputFile());
+      } else {
+        addResult(currentWriter.result());
+      }
+
+      this.currentFile = null;
+      this.currentFileRows = 0;
+      this.currentWriter = null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java
new file mode 100644
index 0000000..001b0bb
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingPositionDeleteWriter<T>
+    extends RollingFileWriter<PositionDelete<T>, PositionDeleteWriter<T>, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final List<DeleteFile> deleteFiles;
+  private final CharSequenceSet referencedDataFiles;
+
+  public RollingPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                     FileIO io, long targetFileSizeInBytes,
+                                     PartitionSpec spec, StructLike partition) {
+    super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+    this.writerFactory = writerFactory;
+    this.deleteFiles = Lists.newArrayList();
+    this.referencedDataFiles = CharSequenceSet.empty();
+    openCurrentWriter();
+  }
+
+  @Override
+  protected PositionDeleteWriter<T> newWriter(EncryptedOutputFile file) {
+    return writerFactory.newPositionDeleteWriter(file, spec(), partition());
+  }
+
+  @Override
+  protected void addResult(DeleteWriteResult result) {
+    deleteFiles.addAll(result.deleteFiles());
+    referencedDataFiles.addAll(result.referencedDataFiles());
+  }
+
+  @Override
+  protected DeleteWriteResult aggregatedResult() {
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
new file mode 100644
index 0000000..e11e0a7
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestRollingFileWriters<T> extends TableTestBase {
+
+  // TODO: add ORC once we support ORC rolling file writers
+
+  @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO, false},
+        new Object[]{FileFormat.AVRO, true},
+        new Object[]{FileFormat.PARQUET, false},
+        new Object[]{FileFormat.PARQUET, true},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 1000;
+  private static final long DEFAULT_FILE_SIZE = 128L * 1024 * 1024;
+  private static final long SMALL_FILE_SIZE = 2L;
+  private static final String PARTITION_VALUE = "aaa";
+
+  private final FileFormat fileFormat;
+  private final boolean partitioned;
+  private StructLike partition = null;
+  private OutputFileFactory fileFactory = null;
+
+  public TestRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+    this.partitioned = partitioned;
+  }
+
+  protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                           Schema equalityDeleteRowSchema);
+
+  protected abstract T toRow(Integer id, String data);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    if (partitioned) {
+      this.table = create(SCHEMA, SPEC);
+      this.partition = initPartitionKey();
+    } else {
+      this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+      this.partition = null;
+    }
+
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  private PartitionKey initPartitionKey() {
+    Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE));
+
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    partitionKey.partition(record);
+
+    return partitionKey;
+  }
+
+  @Test
+  public void testRollingDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    RollingDataWriter<T> writer = new RollingDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        DEFAULT_FILE_SIZE, table.spec(), partition);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testRollingDataWriterSplitData() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    RollingDataWriter<T> writer = new RollingDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        SMALL_FILE_SIZE, table.spec(), partition);
+
+    List<T> rows = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+    for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+      rows.add(toRow(index, PARTITION_VALUE));
+    }
+
+    try (RollingDataWriter<T> closableWriter = writer) {
+      closableWriter.write(rows);
+    }
+
+    // call close again to ensure it is idempotent
+    writer.close();
+
+    Assert.assertEquals(4, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testRollingEqualityDeleteWriterNoRecords() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+    RollingEqualityDeleteWriter<T> writer = new RollingEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        DEFAULT_FILE_SIZE, table.spec(), partition);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testRollingEqualityDeleteWriterSplitDeletes() throws IOException {
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+    RollingEqualityDeleteWriter<T> writer = new RollingEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        SMALL_FILE_SIZE, table.spec(), partition);
+
+    List<T> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+    for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+      deletes.add(toRow(index, PARTITION_VALUE));
+    }
+
+    try (RollingEqualityDeleteWriter<T> closeableWriter = writer) {
+      closeableWriter.write(deletes);
+    }
+
+    // call close again to ensure it is idempotent
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals(4, result.deleteFiles().size());
+    Assert.assertEquals(0, result.referencedDataFiles().size());
+    Assert.assertFalse(result.referencesDataFiles());
+  }
+
+  @Test
+  public void testRollingPositionDeleteWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    RollingPositionDeleteWriter<T> writer = new RollingPositionDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        DEFAULT_FILE_SIZE, table.spec(), partition);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testRollingPositionDeleteWriterSplitDeletes() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    RollingPositionDeleteWriter<T> writer = new RollingPositionDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        SMALL_FILE_SIZE, table.spec(), partition);
+
+    List<PositionDelete<T>> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+    for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+      deletes.add(new PositionDelete<T>().set("path/to/data/file-1.parquet", index, null));
+    }
+
+    try (RollingPositionDeleteWriter<T> closeableWriter = writer) {
+      closeableWriter.write(deletes);
+    }
+
+    // call close again to ensure it is idempotent
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals(4, result.deleteFiles().size());
+    Assert.assertEquals(1, result.referencedDataFiles().size());
+    Assert.assertTrue(result.referencesDataFiles());
+  }
+
+  private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
+    return newWriterFactory(dataSchema, null, null);
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
new file mode 100644
index 0000000..a3d62d5
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.util.ArrayUtil;
+
+public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData> {
+
+  public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+    super(fileFormat, partitioned);
+  }
+
+  @Override
+  protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                        Schema equalityDeleteRowSchema) {
+    return FlinkFileWriterFactory.builderFor(table)
+        .dataSchema(table.schema())
+        .dataFileFormat(format())
+        .deleteFileFormat(format())
+        .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+        .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .build();
+  }
+
+  @Override
+  protected RowData toRow(Integer id, String data) {
+    return SimpleDataUtil.createRowData(id, data);
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
new file mode 100644
index 0000000..3ea2353
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class TestSparkRollingFileWriters extends TestRollingFileWriters<InternalRow> {
+
+  public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+    super(fileFormat, partitioned);
+  }
+
+  @Override
+  protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                            Schema equalityDeleteRowSchema) {
+    return SparkFileWriterFactory.builderFor(table)
+        .dataSchema(table.schema())
+        .dataFileFormat(format())
+        .deleteFileFormat(format())
+        .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+        .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .build();
+  }
+
+  @Override
+  protected InternalRow toRow(Integer id, String data) {
+    InternalRow row = new GenericInternalRow(2);
+    row.update(0, id);
+    row.update(1, UTF8String.fromString(data));
+    return row;
+  }
+}