You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/23 23:25:46 UTC

[iceberg] branch master updated: Core: Add PartitioningWriter (#3164)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f327a  Core: Add PartitioningWriter (#3164)
11f327a is described below

commit 11f327a2f08d95fdf8fea68412f0ae1687a3b63f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Sep 23 16:25:33 2021 -0700

    Core: Add PartitioningWriter (#3164)
---
 build.gradle                                       |   4 -
 .../org/apache/iceberg/deletes/PositionDelete.java |   5 +-
 .../org/apache/iceberg/io/ClusteredDataWriter.java |  73 +++
 .../iceberg/io/ClusteredEqualityDeleteWriter.java  |  75 +++
 .../iceberg/io/ClusteredPositionDeleteWriter.java  |  78 +++
 .../org/apache/iceberg/io/ClusteredWriter.java     | 134 ++++++
 .../org/apache/iceberg/io/FanoutDataWriter.java    |  73 +++
 .../java/org/apache/iceberg/io/FanoutWriter.java   | 105 ++++
 .../org/apache/iceberg/io/PartitioningWriter.java  |  60 +++
 .../java/org/apache/iceberg/io/StructCopy.java     |   2 +-
 .../java/org/apache/iceberg/TableTestBase.java     |   6 +
 .../org/apache/iceberg/io/TestAppenderFactory.java |   6 +-
 .../apache/iceberg/io/TestFileWriterFactory.java   |  54 +--
 .../apache/iceberg/io/TestPartitioningWriters.java | 535 +++++++++++++++++++++
 .../apache/iceberg/io/TestRollingFileWriters.java  |  29 +-
 .../java/org/apache/iceberg/io/WriterTestBase.java |  93 ++++
 ...ters.java => TestFlinkPartitioningWriters.java} |  27 +-
 .../flink/sink/TestFlinkRollingFileWriters.java    |   4 +-
 jmh.gradle                                         |   2 +-
 .../iceberg/spark/source/WritersBenchmark.java     | 353 ++++++++++++++
 .../spark/source/avro/AvroWritersBenchmark.java    |  41 ++
 .../source/parquet/ParquetWritersBenchmark.java    |  41 ++
 ...ters.java => TestSparkPartitioningWriters.java} |  26 +-
 .../spark/source/TestSparkRollingFileWriters.java  |   4 +-
 24 files changed, 1736 insertions(+), 94 deletions(-)

diff --git a/build.gradle b/build.gradle
index cecf8b7..a56a03e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -115,10 +115,6 @@ subprojects {
     options.encoding = 'UTF-8'
   }
 
-  ext {
-    jmhVersion = '1.21'
-  }
-
   sourceCompatibility = '1.8'
   targetCompatibility = '1.8'
 
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index 4cd2d31..cac04ca 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -22,7 +22,7 @@ package org.apache.iceberg.deletes;
 import org.apache.iceberg.StructLike;
 
 public class PositionDelete<R> implements StructLike {
-  static <T> PositionDelete<T> create() {
+  public static <T> PositionDelete<T> create() {
     return new PositionDelete<>();
   }
 
@@ -30,6 +30,9 @@ public class PositionDelete<R> implements StructLike {
   private long pos;
   private R row;
 
+  private PositionDelete() {
+  }
+
   public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
     this.path = newPath;
     this.pos = newPos;
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
new file mode 100644
index 0000000..a6982cd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A data writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DataFile> dataFiles;
+
+  public ClusteredDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                             FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.dataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers
+    if (fileFormat == FileFormat.ORC) {
+      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+      return writerFactory.newDataWriter(outputFile, spec, partition);
+    } else {
+      return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+    }
+  }
+
+  @Override
+  protected void addResult(DataWriteResult result) {
+    dataFiles.addAll(result.dataFiles());
+  }
+
+  @Override
+  protected DataWriteResult aggregatedResult() {
+    return new DataWriteResult(dataFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
new file mode 100644
index 0000000..385d1a5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An equality delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DeleteFile> deleteFiles;
+
+  public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                       FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.deleteFiles = Lists.newArrayList();
+  }
+
+  @Override
+  protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers
+    if (fileFormat == FileFormat.ORC) {
+      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+      return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
+    } else {
+      return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+    }
+  }
+
+  @Override
+  protected void addResult(DeleteWriteResult result) {
+    Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
+    deleteFiles.addAll(result.deleteFiles());
+  }
+
+  @Override
+  protected DeleteWriteResult aggregatedResult() {
+    return new DeleteWriteResult(deleteFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
new file mode 100644
index 0000000..ea11838
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A position delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDelete<T>, DeleteWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DeleteFile> deleteFiles;
+  private final CharSequenceSet referencedDataFiles;
+
+  public ClusteredPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                                       FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.deleteFiles = Lists.newArrayList();
+    this.referencedDataFiles = CharSequenceSet.empty();
+  }
+
+  @Override
+  protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers
+    if (fileFormat == FileFormat.ORC) {
+      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+      return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
+    } else {
+      return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+    }
+  }
+
+  @Override
+  protected void addResult(DeleteWriteResult result) {
+    deleteFiles.addAll(result.deleteFiles());
+    referencedDataFiles.addAll(result.referencedDataFiles());
+  }
+
+  @Override
+  protected DeleteWriteResult aggregatedResult() {
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
new file mode 100644
index 0000000..8729fd1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private static final String NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE =
+      "Incoming records violate the writer assumption that records are clustered by spec and " +
+      "by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n" +
+      "Encountered records that belong to already closed files:\n";
+
+  private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+  private PartitionSpec currentSpec = null;
+  private Comparator<StructLike> partitionComparator = null;
+  private Set<StructLike> completedPartitions = null;
+  private StructLike currentPartition = null;
+  private FileWriter<T, R> currentWriter = null;
+
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    if (!spec.equals(currentSpec)) {
+      if (currentSpec != null) {
+        closeCurrentWriter();
+        completedSpecIds.add(currentSpec.specId());
+        completedPartitions.clear();
+      }
+
+      if (completedSpecIds.contains(spec.specId())) {
+        String errorCtx = String.format("spec %s", spec);
+        throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
+      }
+
+      StructType partitionType = spec.partitionType();
+
+      this.currentSpec = spec;
+      this.partitionComparator = Comparators.forType(partitionType);
+      this.completedPartitions = StructLikeSet.create(partitionType);
+      // copy the partition key as the key object may be reused
+      this.currentPartition = StructCopy.copy(partition);
+      this.currentWriter = newWriter(currentSpec, currentPartition);
+
+    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+      closeCurrentWriter();
+      completedPartitions.add(currentPartition);
+
+      if (completedPartitions.contains(partition)) {
+        String errorCtx = String.format("partition '%s' in spec %s", spec.partitionToPath(partition), spec);
+        throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
+      }
+
+      // copy the partition key as the key object may be reused
+      this.currentPartition = StructCopy.copy(partition);
+      this.currentWriter = newWriter(currentSpec, currentPartition);
+    }
+
+    currentWriter.write(row);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  private void closeCurrentWriter() throws IOException {
+    if (currentWriter != null) {
+      currentWriter.close();
+
+      addResult(currentWriter.result());
+
+      this.currentWriter = null;
+    }
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+        "Partition must not be null when creating output file for partitioned spec");
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
new file mode 100644
index 0000000..d6e16a7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A data writer capable of writing to multiple specs and partitions that keeps data writers for each
+ * seen spec/partition pair open until this writer is closed.
+ */
+public class FanoutDataWriter<T> extends FanoutWriter<T, DataWriteResult> {
+
+  private final FileWriterFactory<T> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final FileFormat fileFormat;
+  private final long targetFileSizeInBytes;
+  private final List<DataFile> dataFiles;
+
+  public FanoutDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                          FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.fileFormat = fileFormat;
+    this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.dataFiles = Lists.newArrayList();
+  }
+
+  @Override
+  protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+    // TODO: support ORC rolling writers
+    if (fileFormat == FileFormat.ORC) {
+      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+      return writerFactory.newDataWriter(outputFile, spec, partition);
+    } else {
+      return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+    }
+  }
+
+  @Override
+  protected void addResult(DataWriteResult result) {
+    dataFiles.addAll(result.dataFiles());
+  }
+
+  @Override
+  protected DataWriteResult aggregatedResult() {
+    return new DataWriteResult(dataFiles);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
new file mode 100644
index 0000000..122a25d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+  private final Map<Integer, StructLikeMap<FileWriter<T, R>>> writers = Maps.newHashMap();
+  private boolean closed = false;
+
+  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+  protected abstract void addResult(R result);
+
+  protected abstract R aggregatedResult();
+
+  @Override
+  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+    FileWriter<T, R> writer = writer(spec, partition);
+    writer.write(row);
+  }
+
+  private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+    Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+        spec.specId(),
+        id -> StructLikeMap.create(spec.partitionType()));
+    FileWriter<T, R> writer = specWriters.get(partition);
+
+    if (writer == null) {
+      // copy the partition key as the key object may be reused
+      StructLike copiedPartition = StructCopy.copy(partition);
+      writer = newWriter(spec, copiedPartition);
+      specWriters.put(copiedPartition, writer);
+    }
+
+    return writer;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeWriters();
+      this.closed = true;
+    }
+  }
+
+  private void closeWriters() throws IOException {
+    for (Map<StructLike, FileWriter<T, R>> specWriters : writers.values()) {
+      for (FileWriter<T, R> writer : specWriters.values()) {
+        writer.close();
+        addResult(writer.result());
+      }
+
+      specWriters.clear();
+    }
+
+    writers.clear();
+  }
+
+  @Override
+  public final R result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return aggregatedResult();
+  }
+
+  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+    Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+        "Partition must not be null when creating output file for partitioned spec");
+    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
new file mode 100644
index 0000000..329e68c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+
+/**
+ * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions.
+ * <p>
+ * As opposed to {@link FileWriter}, this interface should be implemented by writers that are not
+ * limited to writing to a single spec/partition. Implementations may internally use {@link FileWriter}s
+ * for writing to a single spec/partition.
+ * <p>
+ * Note that this writer can be used both for partitioned and unpartitioned tables.
+ *
+ * @param <T> the row type
+ * @param <R> the result type
+ */
+public interface PartitioningWriter<T, R> extends Closeable {
+
+  /**
+   * Writes a row to the provided spec/partition.
+   *
+   * @param row a data or delete record
+   * @param spec a partition spec
+   * @param partition a partition or null if the spec is unpartitioned
+   * @throws IOException in case of an error during the write process
+   */
+  void write(T row, PartitionSpec spec, StructLike partition) throws IOException;
+
+  /**
+   * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
+   * The result is valid only after the writer is closed.
+   *
+   * @return the writer result
+   */
+  R result();
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/StructCopy.java b/core/src/main/java/org/apache/iceberg/io/StructCopy.java
index 6cf461d..7d43733 100644
--- a/core/src/main/java/org/apache/iceberg/io/StructCopy.java
+++ b/core/src/main/java/org/apache/iceberg/io/StructCopy.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.StructLike;
  */
 class StructCopy implements StructLike {
   static StructLike copy(StructLike struct) {
-    return new StructCopy(struct);
+    return struct != null ? new StructCopy(struct) : null;
   }
 
   private final Object[] values;
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 9ef28b9..7325cfe 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.LongStream;
+import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -454,6 +455,11 @@ public class TableTestBase {
         .build();
   }
 
+  protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos, T row) {
+    PositionDelete<T> positionDelete = PositionDelete.create();
+    return positionDelete.set(path, pos, row);
+  }
+
   static void validateManifestEntries(ManifestFile manifest,
                                       Iterator<Long> ids,
                                       Iterator<DataFile> expectedFiles,
diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
index 7df402e..0a88cbc 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
@@ -273,9 +273,9 @@ public abstract class TestAppenderFactory<T> extends TableTestBase {
     DataFile dataFile = prepareDataFile(rowSet, appenderFactory);
 
     List<PositionDelete<T>> deletes = Lists.newArrayList(
-        new PositionDelete<T>().set(dataFile.path(), 0, rowSet.get(0)),
-        new PositionDelete<T>().set(dataFile.path(), 2, rowSet.get(2)),
-        new PositionDelete<T>().set(dataFile.path(), 4, rowSet.get(4))
+        positionDelete(dataFile.path(), 0, rowSet.get(0)),
+        positionDelete(dataFile.path(), 2, rowSet.get(2)),
+        positionDelete(dataFile.path(), 4, rowSet.get(4))
     );
 
     EncryptedOutputFile out = createEncryptedOutputFile();
diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index 734e5d3..c3c4691 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -26,14 +26,11 @@ import java.util.Map;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -59,7 +56,7 @@ import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
 import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
 
 @RunWith(Parameterized.class)
-public abstract class TestFileWriterFactory<T> extends TableTestBase {
+public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
   @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
   public static Object[] parameters() {
     return new Object[][] {
@@ -73,6 +70,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
   }
 
   private static final int TABLE_FORMAT_VERSION = 2;
+  private static final String PARTITION_VALUE = "aaa";
 
   private final FileFormat fileFormat;
   private final boolean partitioned;
@@ -94,12 +92,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
     );
   }
 
-  protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                           Schema equalityDeleteRowSchema,
-                                                           Schema positionDeleteRowSchema);
-
-  protected abstract T toRow(Integer id, String data);
-
   protected abstract StructLikeSet toSet(Iterable<T> records);
 
   protected FileFormat format() {
@@ -115,7 +107,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
 
     if (partitioned) {
       this.table = create(SCHEMA, SPEC);
-      this.partition = initPartitionKey();
+      this.partition = partitionKey(table.spec(), PARTITION_VALUE);
     } else {
       this.table = create(SCHEMA, PartitionSpec.unpartitioned());
       this.partition = null;
@@ -222,7 +214,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
         .addField("data")
         .commit();
 
-    partition = initPartitionKey();
+    partition = partitionKey(table.spec(), PARTITION_VALUE);
 
     // write a partitioned data file
     DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
@@ -259,9 +251,9 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
 
     // write a position delete file
     List<PositionDelete<T>> deletes = ImmutableList.of(
-        new PositionDelete<T>().set(dataFile.path(), 0L, null),
-        new PositionDelete<T>().set(dataFile.path(), 2L, null),
-        new PositionDelete<T>().set(dataFile.path(), 4L, null)
+        positionDelete(dataFile.path(), 0L, null),
+        positionDelete(dataFile.path(), 2L, null),
+        positionDelete(dataFile.path(), 4L, null)
     );
     Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
     DeleteFile deleteFile = result.first();
@@ -305,7 +297,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
 
     // write a position delete file and persist the deleted row
     List<PositionDelete<T>> deletes = ImmutableList.of(
-        new PositionDelete<T>().set(dataFile.path(), 0, dataRows.get(0))
+        positionDelete(dataFile.path(), 0, dataRows.get(0))
     );
     Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
     DeleteFile deleteFile = result.first();
@@ -343,28 +335,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
     Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
   }
 
-  private PartitionKey initPartitionKey() {
-    Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
-
-    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
-    partitionKey.partition(record);
-
-    return partitionKey;
-  }
-
-  private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
-    return newWriterFactory(dataSchema, null, null, null);
-  }
-
-  private FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                Schema equalityDeleteRowSchema) {
-    return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
-  }
-
-  private FileWriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
-    return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
-  }
-
   private DataFile writeData(FileWriterFactory<T> writerFactory, List<T> rows,
                              PartitionSpec spec, StructLike partitionKey) throws IOException {
 
@@ -435,14 +405,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
     }
   }
 
-  private StructLikeSet actualRowSet(String... columns) throws IOException {
-    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
-    try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
-      reader.forEach(set::add);
-    }
-    return set;
-  }
-
   private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) {
     return fileFactory.newOutputFile(spec, partitionKey);
   }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
new file mode 100644
index 0000000..b01ccc5
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+  @Parameterized.Parameters(name = "FileFormat={0}")
+  public static Object[] parameters() {
+    return new Object[][] {
+        new Object[]{FileFormat.AVRO},
+        new Object[]{FileFormat.PARQUET},
+        new Object[]{FileFormat.ORC},
+    };
+  }
+
+  private static final int TABLE_FORMAT_VERSION = 2;
+  private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+  private final FileFormat fileFormat;
+  private OutputFileFactory fileFactory = null;
+
+  public TestPartitioningWriters(FileFormat fileFormat) {
+    super(TABLE_FORMAT_VERSION);
+    this.fileFormat = fileFormat;
+  }
+
+  protected abstract StructLikeSet toSet(Iterable<T> records);
+
+  protected FileFormat format() {
+    return fileFormat;
+  }
+
+  @Before
+  public void setupTable() throws Exception {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created during table creation
+
+    this.metadataDir = new File(tableDir, "metadata");
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+  }
+
+  @Test
+  public void testClusteredDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testClusteredDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Encountered records that belong to already closed files",
+        () -> {
+          try {
+            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    writer.close();
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    // add an unpartitioned data file
+    ImmutableList<T> rows1 = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(11, "aaa")
+    );
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+    table.newFastAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // partition by bucket
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    // add a data file partitioned by bucket
+    ImmutableList<T> rows2 = ImmutableList.of(
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(12, "bbb")
+    );
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+    table.newFastAppend()
+        .appendFile(dataFile2)
+        .commit();
+
+    // partition by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file partitioned by data
+    ImmutableList<T> rows3 = ImmutableList.of(
+        toRow(5, "ccc"),
+        toRow(13, "ccc")
+    );
+    DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+    table.newFastAppend()
+        .appendFile(dataFile3)
+        .commit();
+
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+    Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse("Must not reference data files", writer.result().referencesDataFiles());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(11, "aaa"),
+        toRow(12, "bbb"),
+        toRow(13, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+    Schema equalityDeleteRowSchema = table.schema().select("id");
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+    writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+    writer.write(toRow(6, "ddd"), identitySpec, partitionKey(identitySpec, "ddd"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Encountered records that belong to already closed files",
+        () -> {
+          try {
+            writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    AssertHelpers.assertThrows("Should fail to write out of order specs",
+        IllegalStateException.class, "Encountered records that belong to already closed files",
+        () -> {
+          try {
+            writer.write(toRow(7, "aaa"), unpartitionedSpec, null);
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    writer.close();
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+
+    writer.close();
+    Assert.assertEquals(0, writer.result().deleteFiles().size());
+    Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+    Assert.assertFalse(writer.result().referencesDataFiles());
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add an unpartitioned data file
+    ImmutableList<T> rows1 = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(11, "aaa")
+    );
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+    table.newFastAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    // partition by bucket
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    // add a data file partitioned by bucket
+    ImmutableList<T> rows2 = ImmutableList.of(
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(12, "bbb")
+    );
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+    table.newFastAppend()
+        .appendFile(dataFile2)
+        .commit();
+
+    // partition by data
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    // add a data file partitioned by data
+    ImmutableList<T> rows3 = ImmutableList.of(
+        toRow(5, "ccc"),
+        toRow(13, "ccc")
+    );
+    DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+    table.newFastAppend()
+        .appendFile(dataFile3)
+        .commit();
+
+    ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null);
+    writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null);
+    writer.write(positionDelete(dataFile2.path(), 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(positionDelete(dataFile2.path(), 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(positionDelete(dataFile3.path(), 0L, null), identitySpec, partitionKey(identitySpec, "ccc"));
+
+    writer.close();
+
+    DeleteWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+    Assert.assertEquals("Must reference 3 data files", 3, writer.result().referencedDataFiles().size());
+    Assert.assertTrue("Must reference data files", writer.result().referencesDataFiles());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(11, "aaa"),
+        toRow(12, "bbb"),
+        toRow(13, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
+    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    table.updateSpec()
+        .addField(Expressions.bucket("data", 16))
+        .commit();
+
+    table.updateSpec()
+        .removeField(Expressions.bucket("data", 16))
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec unpartitionedSpec = table.specs().get(0);
+    PartitionSpec bucketSpec = table.specs().get(1);
+    PartitionSpec identitySpec = table.specs().get(2);
+
+    writer.write(positionDelete("file-1.parquet", 0L, null), unpartitionedSpec, null);
+    writer.write(positionDelete("file-1.parquet", 1L, null), unpartitionedSpec, null);
+    writer.write(positionDelete("file-2.parquet", 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(positionDelete("file-2.parquet", 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+    writer.write(positionDelete("file-3.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ccc"));
+    writer.write(positionDelete("file-4.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ddd"));
+
+    AssertHelpers.assertThrows("Should fail to write out of order partitions",
+        IllegalStateException.class, "Encountered records that belong to already closed files",
+        () -> {
+          try {
+            PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
+            writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    AssertHelpers.assertThrows("Should fail to write out of order specs",
+        IllegalStateException.class, "Encountered records that belong to already closed files",
+        () -> {
+          try {
+            PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
+            writer.write(positionDelete, unpartitionedSpec, null);
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+
+    writer.close();
+  }
+
+  @Test
+  public void testFanoutDataWriterNoRecords() throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    FanoutDataWriter<T> writer = new FanoutDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+    writer.close();
+    Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+  }
+
+  @Test
+  public void testFanoutDataWriterMultiplePartitions() throws IOException {
+    table.updateSpec()
+        .addField(Expressions.ref("data"))
+        .commit();
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+    FanoutDataWriter<T> writer = new FanoutDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);
+
+    PartitionSpec spec = table.spec();
+
+    writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+    writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+    writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+    writer.close();
+
+    DataWriteResult result = writer.result();
+    Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+    RowDelta rowDelta = table.newRowDelta();
+    result.dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+
+    List<T> expectedRows = ImmutableList.of(
+        toRow(1, "aaa"),
+        toRow(2, "aaa"),
+        toRow(3, "bbb"),
+        toRow(4, "bbb"),
+        toRow(5, "ccc")
+    );
+    Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
index e11e0a7..a62ac3e 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -23,16 +23,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.TableTestBase;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -42,7 +37,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public abstract class TestRollingFileWriters<T> extends TableTestBase {
+public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
 
   // TODO: add ORC once we support ORC rolling file writers
 
@@ -73,11 +68,6 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
     this.partitioned = partitioned;
   }
 
-  protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                           Schema equalityDeleteRowSchema);
-
-  protected abstract T toRow(Integer id, String data);
-
   protected FileFormat format() {
     return fileFormat;
   }
@@ -91,7 +81,7 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
 
     if (partitioned) {
       this.table = create(SCHEMA, SPEC);
-      this.partition = initPartitionKey();
+      this.partition = partitionKey(table.spec(), PARTITION_VALUE);
     } else {
       this.table = create(SCHEMA, PartitionSpec.unpartitioned());
       this.partition = null;
@@ -100,15 +90,6 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
     this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
   }
 
-  private PartitionKey initPartitionKey() {
-    Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE));
-
-    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
-    partitionKey.partition(record);
-
-    return partitionKey;
-  }
-
   @Test
   public void testRollingDataWriterNoRecords() throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
@@ -221,7 +202,7 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
 
     List<PositionDelete<T>> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
     for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
-      deletes.add(new PositionDelete<T>().set("path/to/data/file-1.parquet", index, null));
+      deletes.add(positionDelete("path/to/data/file-1.parquet", index, null));
     }
 
     try (RollingPositionDeleteWriter<T> closeableWriter = writer) {
@@ -236,8 +217,4 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
     Assert.assertEquals(1, result.referencedDataFiles().size());
     Assert.assertTrue(result.referencesDataFiles());
   }
-
-  private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
-    return newWriterFactory(dataSchema, null, null);
-  }
 }
diff --git a/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java
new file mode 100644
index 0000000..661ab64
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.StructLikeSet;
+
+public abstract class WriterTestBase<T> extends TableTestBase {
+
+  public WriterTestBase(int formatVersion) {
+    super(formatVersion);
+  }
+
+  protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                           Schema equalityDeleteRowSchema,
+                                                           Schema positionDeleteRowSchema);
+
+  protected FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+                                                  Schema equalityDeleteRowSchema) {
+    return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
+  }
+
+  protected FileWriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
+    return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
+  }
+
+  protected FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
+    return newWriterFactory(dataSchema, null, null, null);
+  }
+
+  protected abstract T toRow(Integer id, String data);
+
+  protected PartitionKey partitionKey(PartitionSpec spec, String value) {
+    Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", value));
+
+    PartitionKey partitionKey = new PartitionKey(spec, table.schema());
+    partitionKey.partition(record);
+
+    return partitionKey;
+  }
+
+  protected StructLikeSet actualRowSet(String... columns) throws IOException {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
+      reader.forEach(set::add);
+    }
+    return set;
+  }
+
+  protected DataFile writeData(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+                               List<T> rows, PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+    EncryptedOutputFile file = fileFactory.newOutputFile(spec, partitionKey);
+    DataWriter<T> writer = writerFactory.newDataWriter(file, spec, partitionKey);
+
+    try (DataWriter<T> closeableWriter = writer) {
+      for (T row : rows) {
+        closeableWriter.write(row);
+      }
+    }
+
+    return writer.toDataFile();
+  }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
similarity index 63%
copy from flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
copy to flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index a3d62d5..934b5a0 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -21,28 +21,34 @@ package org.apache.iceberg.flink.sink;
 
 import java.util.List;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.io.TestPartitioningWriters;
 import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
 
-public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData> {
+public class TestFlinkPartitioningWriters<T> extends TestPartitioningWriters<RowData> {
 
-  public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
-    super(fileFormat, partitioned);
+  public TestFlinkPartitioningWriters(FileFormat fileFormat) {
+    super(fileFormat);
   }
 
   @Override
   protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                        Schema equalityDeleteRowSchema) {
+                                                        Schema equalityDeleteRowSchema,
+                                                        Schema positionDeleteRowSchema) {
     return FlinkFileWriterFactory.builderFor(table)
         .dataSchema(table.schema())
         .dataFileFormat(format())
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -50,4 +56,15 @@ public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData>
   protected RowData toRow(Integer id, String data) {
     return SimpleDataUtil.createRowData(id, data);
   }
+
+  @Override
+  protected StructLikeSet toSet(Iterable<RowData> rows) {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    RowType flinkType = FlinkSchemaUtil.convert(table.schema());
+    for (RowData row : rows) {
+      RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct());
+      set.add(wrapper.wrap(row));
+    }
+    return set;
+  }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index a3d62d5..9339e5a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -36,13 +36,15 @@ public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData>
 
   @Override
   protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                        Schema equalityDeleteRowSchema) {
+                                                        Schema equalityDeleteRowSchema,
+                                                        Schema positionDeleteRowSchema) {
     return FlinkFileWriterFactory.builderFor(table)
         .dataSchema(table.schema())
         .dataFileFormat(format())
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git a/jmh.gradle b/jmh.gradle
index 543b07f..edcb39c 100644
--- a/jmh.gradle
+++ b/jmh.gradle
@@ -30,7 +30,7 @@ configure(jmhProjects) {
   apply plugin: 'me.champeau.gradle.jmh'
 
   jmh {
-    jmhVersion = jmhVersion
+    jmhVersion = '1.32'
     failOnError = true
     forceGC = true
     includeTests = true
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
new file mode 100644
index 0000000..a8521de
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.ClusteredEqualityDeleteWriter;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FanoutDataWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class WritersBenchmark extends IcebergSourceBenchmark {
+
+  private static final int NUM_ROWS = 2500000;
+  private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024;
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "longCol", Types.LongType.get()),
+      required(2, "intCol", Types.IntegerType.get()),
+      required(3, "floatCol", Types.FloatType.get()),
+      optional(4, "doubleCol", Types.DoubleType.get()),
+      optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+      optional(6, "timestampCol", Types.TimestampType.withZone()),
+      optional(7, "stringCol", Types.StringType.get())
+  );
+
+  private Iterable<InternalRow> rows;
+  private Iterable<InternalRow> positionDeleteRows;
+  private PartitionSpec unpartitionedSpec;
+  private PartitionSpec partitionedSpec;
+
+  protected abstract FileFormat fileFormat();
+
+  @Setup
+  public void setupBenchmark() {
+    setupSpark();
+
+    List<InternalRow> data = Lists.newArrayList(RandomData.generateSpark(SCHEMA, NUM_ROWS, 0L));
+    Transform<Integer, Integer> transform = Transforms.bucket(Types.IntegerType.get(), 32);
+    data.sort(Comparator.comparingInt(row -> transform.apply(row.getInt(1))));
+    this.rows = data;
+
+    this.positionDeleteRows = RandomData.generateSpark(DeleteSchemaUtil.pathPosSchema(), NUM_ROWS, 0L);
+
+    this.unpartitionedSpec = table().specs().get(0);
+    Preconditions.checkArgument(unpartitionedSpec.isUnpartitioned());
+    this.partitionedSpec = table().specs().get(1);
+  }
+
+  @TearDown
+  public void tearDownBenchmark() throws IOException {
+    tearDownSpark();
+    cleanupFiles();
+  }
+
+  @Override
+  protected Configuration initHadoopConf() {
+    return new Configuration();
+  }
+
+  @Override
+  protected final Table initTable() {
+    HadoopTables tables = new HadoopTables(hadoopConf());
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> properties = Maps.newHashMap();
+    Table table = tables.create(SCHEMA, spec, properties, newTableLocation());
+
+    // add a partitioned spec to the table
+    table.updateSpec()
+        .addField(Expressions.bucket("intCol", 32))
+        .commit();
+
+    return table;
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .dataSchema(table().schema())
+        .build();
+
+    ClusteredDataWriter<InternalRow> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, io,
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
+      for (InternalRow row : rows) {
+        closeableWriter.write(row, unpartitionedSpec, null);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUnpartitionedLegacyDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+
+    Schema writeSchema = table().schema();
+    StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+    SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+        .spec(unpartitionedSpec)
+        .build();
+
+    TaskWriter<InternalRow> writer = new UnpartitionedWriter<>(
+        unpartitionedSpec, fileFormat(), appenders,
+        fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+    try (TaskWriter<InternalRow> closableWriter = writer) {
+      for (InternalRow row : rows) {
+        closableWriter.write(row);
+      }
+    }
+
+    blackhole.consume(writer.complete());
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .dataSchema(table().schema())
+        .build();
+
+    ClusteredDataWriter<InternalRow> writer = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, io,
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+    StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
+    InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
+
+    try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
+      for (InternalRow row : rows) {
+        partitionKey.partition(internalRowWrapper.wrap(row));
+        closeableWriter.write(row, partitionedSpec, partitionKey);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writePartitionedLegacyDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+
+    Schema writeSchema = table().schema();
+    StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+    SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+        .spec(partitionedSpec)
+        .build();
+
+    TaskWriter<InternalRow> writer = new SparkPartitionedWriter(
+        partitionedSpec, fileFormat(), appenders,
+        fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
+        writeSchema, sparkWriteType);
+
+    try (TaskWriter<InternalRow> closableWriter = writer) {
+      for (InternalRow row : rows) {
+        closableWriter.write(row);
+      }
+    }
+
+    blackhole.consume(writer.complete());
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .dataSchema(table().schema())
+        .build();
+
+    FanoutDataWriter<InternalRow> writer = new FanoutDataWriter<>(
+        writerFactory, fileFactory, io,
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+    StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
+    InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
+
+    try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
+      for (InternalRow row : rows) {
+        partitionKey.partition(internalRowWrapper.wrap(row));
+        closeableWriter.write(row, partitionedSpec, partitionKey);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+
+    Schema writeSchema = table().schema();
+    StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+    SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+        .spec(partitionedSpec)
+        .build();
+
+    TaskWriter<InternalRow> writer = new SparkPartitionedFanoutWriter(
+        partitionedSpec, fileFormat(), appenders,
+        fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
+        writeSchema, sparkWriteType);
+
+    try (TaskWriter<InternalRow> closableWriter = writer) {
+      for (InternalRow row : rows) {
+        closableWriter.write(row);
+      }
+    }
+
+    blackhole.consume(writer.complete());
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    int equalityFieldId = table().schema().findField("longCol").fieldId();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .equalityDeleteRowSchema(table().schema())
+        .equalityFieldIds(new int[]{equalityFieldId})
+        .build();
+
+    ClusteredEqualityDeleteWriter<InternalRow> writer = new ClusteredEqualityDeleteWriter<>(
+        writerFactory, fileFactory, io,
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+    StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
+    InternalRowWrapper internalRowWrapper = new InternalRowWrapper(deleteSparkType);
+
+    try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
+      for (InternalRow row : rows) {
+        partitionKey.partition(internalRowWrapper.wrap(row));
+        closeableWriter.write(row, partitionedSpec, partitionKey);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) throws IOException {
+    FileIO io = table().io();
+
+    OutputFileFactory fileFactory = newFileFactory();
+    SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+        .dataFileFormat(fileFormat())
+        .build();
+
+    ClusteredPositionDeleteWriter<InternalRow> writer = new ClusteredPositionDeleteWriter<>(
+        writerFactory, fileFactory, io,
+        fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+    PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+    try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
+      for (InternalRow row : positionDeleteRows) {
+        String path = row.getString(0);
+        long pos = row.getLong(1);
+        positionDelete.set(path, pos, null);
+        closeableWriter.write(positionDelete, unpartitionedSpec, null);
+      }
+    }
+
+    blackhole.consume(writer);
+  }
+
+  private OutputFileFactory newFileFactory() {
+    return OutputFileFactory.builderFor(table(), 1, 1)
+        .format(fileFormat())
+        .build();
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
new file mode 100644
index 0000000..5d970d0
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.avro;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.WritersBenchmark;
+
+/**
+ * A benchmark that evaluates the performance of various Iceberg writers for Avro data.
+ *
+ * To run this benchmark for either spark-2 or spark-3:
+ * <code>
+ *   ./gradlew :iceberg-spark[2|3]:jmh
+ *       -PjmhIncludeRegex=AvroWritersBenchmark
+ *       -PjmhOutputPath=benchmark/avro-writers-benchmark-result.txt
+ * </code>
+ */
+public class AvroWritersBenchmark extends WritersBenchmark {
+
+  @Override
+  protected FileFormat fileFormat() {
+    return FileFormat.AVRO;
+  }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
new file mode 100644
index 0000000..ff354ba
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.WritersBenchmark;
+
+/**
+ * A benchmark that evaluates the performance of various Iceberg writers for Parquet data.
+ *
+ * To run this benchmark for either spark-2 or spark-3:
+ * <code>
+ *   ./gradlew :iceberg-spark[2|3]:jmh
+ *       -PjmhIncludeRegex=ParquetWritersBenchmark
+ *       -PjmhOutputPath=benchmark/parquet-writers-benchmark-result.txt
+ * </code>
+ */
+public class ParquetWritersBenchmark extends WritersBenchmark {
+
+  @Override
+  protected FileFormat fileFormat() {
+    return FileFormat.PARQUET;
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
similarity index 67%
copy from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
copy to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 3ea2353..4d07cfb 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -23,27 +23,32 @@ import java.util.List;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.io.TestPartitioningWriters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
-public class TestSparkRollingFileWriters extends TestRollingFileWriters<InternalRow> {
+public class TestSparkPartitioningWriters extends TestPartitioningWriters<InternalRow> {
 
-  public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
-    super(fileFormat, partitioned);
+  public TestSparkPartitioningWriters(FileFormat fileFormat) {
+    super(fileFormat);
   }
 
   @Override
   protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                            Schema equalityDeleteRowSchema) {
+                                                            Schema equalityDeleteRowSchema,
+                                                            Schema positionDeleteRowSchema) {
     return SparkFileWriterFactory.builderFor(table)
         .dataSchema(table.schema())
         .dataFileFormat(format())
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -54,4 +59,15 @@ public class TestSparkRollingFileWriters extends TestRollingFileWriters<Internal
     row.update(1, UTF8String.fromString(data));
     return row;
   }
+
+  @Override
+  protected StructLikeSet toSet(Iterable<InternalRow> rows) {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    StructType sparkType = SparkSchemaUtil.convert(table.schema());
+    for (InternalRow row : rows) {
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      set.add(wrapper.wrap(row));
+    }
+    return set;
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
index 3ea2353..9023195 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
@@ -37,13 +37,15 @@ public class TestSparkRollingFileWriters extends TestRollingFileWriters<Internal
 
   @Override
   protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
-                                                            Schema equalityDeleteRowSchema) {
+                                                            Schema equalityDeleteRowSchema,
+                                                            Schema positionDeleteRowSchema) {
     return SparkFileWriterFactory.builderFor(table)
         .dataSchema(table.schema())
         .dataFileFormat(format())
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
+        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }