You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/04/05 17:27:34 UTC

[iceberg] branch master updated: Spark 3.3: Dataset writes for position deletes (#7029)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 22d29a5a9a Spark 3.3: Dataset writes for position deletes (#7029)
22d29a5a9a is described below

commit 22d29a5a9ae0d96139ceb0b5675981178fda210a
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Wed Apr 5 10:27:26 2023 -0700

    Spark 3.3: Dataset writes for position deletes (#7029)
---
 .../org/apache/iceberg/PositionDeletesTable.java   |  10 +-
 ...inator.java => BaseFileRewriteCoordinator.java} |  49 +-
 .../iceberg/spark/FileRewriteCoordinator.java      |  66 +-
 .../spark/PositionDeletesRewriteCoordinator.java   |  33 +
 .../spark/source/SparkPositionDeletesRewrite.java  | 413 ++++++++++++
 .../source/SparkPositionDeletesRewriteBuilder.java | 113 ++++
 .../apache/iceberg/spark/source/SparkTable.java    |   7 +-
 .../spark/actions/TestRewriteDataFilesAction.java  |   2 +-
 .../spark/source/TestPositionDeletesTable.java     | 695 ++++++++++++++++++++-
 9 files changed, 1276 insertions(+), 112 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 29228d136d..1983e0ddfc 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -41,6 +41,10 @@ import org.apache.iceberg.util.TableScanUtil;
  */
 public class PositionDeletesTable extends BaseMetadataTable {
 
+  public static final String PARTITION = "partition";
+  public static final String SPEC_ID = "spec_id";
+  public static final String DELETE_FILE_PATH = "delete_file_path";
+
   private final Schema schema;
   private final int defaultSpecId;
   private final Map<Integer, PartitionSpec> specs;
@@ -100,17 +104,17 @@ public class PositionDeletesTable extends BaseMetadataTable {
                 MetadataColumns.DELETE_FILE_ROW_DOC),
             Types.NestedField.required(
                 MetadataColumns.PARTITION_COLUMN_ID,
-                "partition",
+                PARTITION,
                 partitionType,
                 "Partition that position delete row belongs to"),
             Types.NestedField.required(
                 MetadataColumns.SPEC_ID_COLUMN_ID,
-                "spec_id",
+                SPEC_ID,
                 Types.IntegerType.get(),
                 MetadataColumns.SPEC_ID_COLUMN_DOC),
             Types.NestedField.required(
                 MetadataColumns.FILE_PATH_COLUMN_ID,
-                "delete_file_path",
+                DELETE_FILE_PATH,
                 Types.StringType.get(),
                 MetadataColumns.FILE_PATH_COLUMN_DOC));
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
similarity index 61%
copy from spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
copy to spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
index 210e861a4c..45c46f1a3e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseFileRewriteCoordinator.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.spark;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -31,62 +31,55 @@ import org.apache.iceberg.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileRewriteCoordinator {
+abstract class BaseFileRewriteCoordinator<F extends ContentFile<F>> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class);
-  private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator();
+  private static final Logger LOG = LoggerFactory.getLogger(BaseFileRewriteCoordinator.class);
 
-  private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap();
-
-  private FileRewriteCoordinator() {}
-
-  public static FileRewriteCoordinator get() {
-    return INSTANCE;
-  }
+  private final Map<Pair<String, String>, Set<F>> resultMap = Maps.newConcurrentMap();
 
   /**
    * Called to persist the output of a rewrite action for a specific group. Since the write is done
    * via a Spark Datasource, we have to propagate the result through this side-effect call.
    *
    * @param table table where the rewrite is occurring
-   * @param fileSetID the id used to identify the source set of files being rewritten
-   * @param newDataFiles the new files which have been written
+   * @param fileSetId the id used to identify the source set of files being rewritten
+   * @param newFiles the new files which have been written
    */
-  public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) {
+  public void stageRewrite(Table table, String fileSetId, Set<F> newFiles) {
     LOG.debug(
         "Staging the output for {} - fileset {} with {} files",
         table.name(),
-        fileSetID,
-        newDataFiles.size());
-    Pair<String, String> id = toID(table, fileSetID);
-    resultMap.put(id, newDataFiles);
+        fileSetId,
+        newFiles.size());
+    Pair<String, String> id = toId(table, fileSetId);
+    resultMap.put(id, newFiles);
   }
 
-  public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
-    Pair<String, String> id = toID(table, fileSetID);
-    Set<DataFile> result = resultMap.get(id);
+  public Set<F> fetchNewFiles(Table table, String fileSetId) {
+    Pair<String, String> id = toId(table, fileSetId);
+    Set<F> result = resultMap.get(id);
     ValidationException.check(
-        result != null, "No results for rewrite of file set %s in table %s", fileSetID, table);
+        result != null, "No results for rewrite of file set %s in table %s", fileSetId, table);
 
     return result;
   }
 
-  public void clearRewrite(Table table, String fileSetID) {
-    LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID);
-    Pair<String, String> id = toID(table, fileSetID);
+  public void clearRewrite(Table table, String fileSetId) {
+    LOG.debug("Removing entry for {} - id {}", table.name(), fileSetId);
+    Pair<String, String> id = toId(table, fileSetId);
     resultMap.remove(id);
   }
 
-  public Set<String> fetchSetIDs(Table table) {
+  public Set<String> fetchSetIds(Table table) {
     return resultMap.keySet().stream()
         .filter(e -> e.first().equals(tableUUID(table)))
         .map(Pair::second)
         .collect(Collectors.toSet());
   }
 
-  private Pair<String, String> toID(Table table, String setID) {
+  private Pair<String, String> toId(Table table, String setId) {
     String tableUUID = tableUUID(table);
-    return Pair.of(tableUUID, setID);
+    return Pair.of(tableUUID, setId);
   }
 
   private String tableUUID(Table table) {
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
index 210e861a4c..4f1d0fffcb 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
@@ -18,79 +18,29 @@
  */
 package org.apache.iceberg.spark;
 
-import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class FileRewriteCoordinator {
+public class FileRewriteCoordinator extends BaseFileRewriteCoordinator<DataFile> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(FileRewriteCoordinator.class);
   private static final FileRewriteCoordinator INSTANCE = new FileRewriteCoordinator();
 
-  private final Map<Pair<String, String>, Set<DataFile>> resultMap = Maps.newConcurrentMap();
-
   private FileRewriteCoordinator() {}
 
   public static FileRewriteCoordinator get() {
     return INSTANCE;
   }
 
-  /**
-   * Called to persist the output of a rewrite action for a specific group. Since the write is done
-   * via a Spark Datasource, we have to propagate the result through this side-effect call.
-   *
-   * @param table table where the rewrite is occurring
-   * @param fileSetID the id used to identify the source set of files being rewritten
-   * @param newDataFiles the new files which have been written
-   */
-  public void stageRewrite(Table table, String fileSetID, Set<DataFile> newDataFiles) {
-    LOG.debug(
-        "Staging the output for {} - fileset {} with {} files",
-        table.name(),
-        fileSetID,
-        newDataFiles.size());
-    Pair<String, String> id = toID(table, fileSetID);
-    resultMap.put(id, newDataFiles);
-  }
-
-  public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
-    Pair<String, String> id = toID(table, fileSetID);
-    Set<DataFile> result = resultMap.get(id);
-    ValidationException.check(
-        result != null, "No results for rewrite of file set %s in table %s", fileSetID, table);
-
-    return result;
-  }
-
-  public void clearRewrite(Table table, String fileSetID) {
-    LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", table.name(), fileSetID);
-    Pair<String, String> id = toID(table, fileSetID);
-    resultMap.remove(id);
+  /** @deprecated will be removed in 1.4.0; use {@link #fetchNewFiles(Table, String)} instead. */
+  @Deprecated
+  public Set<DataFile> fetchNewDataFiles(Table table, String fileSetId) {
+    return fetchNewFiles(table, fileSetId);
   }
 
+  /** @deprecated will be removed in 1.4.0; use {@link #fetchSetIds(Table)} instead */
+  @Deprecated
   public Set<String> fetchSetIDs(Table table) {
-    return resultMap.keySet().stream()
-        .filter(e -> e.first().equals(tableUUID(table)))
-        .map(Pair::second)
-        .collect(Collectors.toSet());
-  }
-
-  private Pair<String, String> toID(Table table, String setID) {
-    String tableUUID = tableUUID(table);
-    return Pair.of(tableUUID, setID);
-  }
-
-  private String tableUUID(Table table) {
-    TableOperations ops = ((HasTableOperations) table).operations();
-    return ops.current().uuid();
+    return fetchSetIds(table);
   }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java
new file mode 100644
index 0000000000..c7568005e2
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/PositionDeletesRewriteCoordinator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.DeleteFile;
+
+public class PositionDeletesRewriteCoordinator extends BaseFileRewriteCoordinator<DeleteFile> {
+
+  private static final PositionDeletesRewriteCoordinator INSTANCE =
+      new PositionDeletesRewriteCoordinator();
+
+  private PositionDeletesRewriteCoordinator() {}
+
+  public static PositionDeletesRewriteCoordinator get() {
+    return INSTANCE;
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
new file mode 100644
index 0000000000..0aebb6bdb2
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * {@link Write} class for rewriting position delete files from Spark. Responsible for creating
+ * {@link PositionDeleteBatchWrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite position delete files. Hence, it
+ * assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all
+ * have the same partition spec id and partition values.
+ */
+public class SparkPositionDeletesRewrite implements Write {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final String queryId;
+  private final FileFormat format;
+  private final long targetFileSize;
+  private final Schema writeSchema;
+  private final StructType dsSchema;
+  private final String fileSetId;
+  private final int specId;
+  private final StructLike partition;
+
+  /**
+   * Constructs a {@link SparkPositionDeletesRewrite}.
+   *
+   * @param spark Spark session
+   * @param table instance of {@link PositionDeletesTable}
+   * @param writeConf Spark write config
+   * @param writeInfo Spark write info
+   * @param writeSchema Iceberg output schema
+   * @param dsSchema schema of original incoming position deletes dataset
+   * @param specId spec id of position deletes
+   * @param partition partition value of position deletes
+   */
+  SparkPositionDeletesRewrite(
+      SparkSession spark,
+      Table table,
+      SparkWriteConf writeConf,
+      LogicalWriteInfo writeInfo,
+      Schema writeSchema,
+      StructType dsSchema,
+      int specId,
+      StructLike partition) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.queryId = writeInfo.queryId();
+    this.format = writeConf.deleteFileFormat();
+    this.targetFileSize = writeConf.targetDeleteFileSize();
+    this.writeSchema = writeSchema;
+    this.dsSchema = dsSchema;
+    this.fileSetId = writeConf.rewrittenFileSetId();
+    this.specId = specId;
+    this.partition = partition;
+  }
+
+  @Override
+  public BatchWrite toBatch() {
+    return new PositionDeleteBatchWrite();
+  }
+
+  /** {@link BatchWrite} class for rewriting position deletes files from Spark */
+  class PositionDeleteBatchWrite implements BatchWrite {
+
+    @Override
+    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to executors
+      Broadcast<Table> tableBroadcast =
+          sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+      return new PositionDeletesWriterFactory(
+          tableBroadcast,
+          queryId,
+          format,
+          targetFileSize,
+          writeSchema,
+          dsSchema,
+          specId,
+          partition);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get();
+      coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages)));
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+      SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+    }
+
+    private List<DeleteFile> files(WriterCommitMessage[] messages) {
+      List<DeleteFile> files = Lists.newArrayList();
+
+      for (WriterCommitMessage message : messages) {
+        if (message != null) {
+          DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+          files.addAll(Arrays.asList(taskCommit.files()));
+        }
+      }
+
+      return files;
+    }
+  }
+
+  /**
+   * Writer factory for position deletes metadata table. Responsible for creating {@link
+   * DeleteWriter}.
+   *
+   * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an
+   * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+   * from {@link ScanTaskSetManager}.
+   */
+  static class PositionDeletesWriterFactory implements DataWriterFactory {
+    private final Broadcast<Table> tableBroadcast;
+    private final String queryId;
+    private final FileFormat format;
+    private final Long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+    private final int specId;
+    private final StructLike partition;
+
+    PositionDeletesWriterFactory(
+        Broadcast<Table> tableBroadcast,
+        String queryId,
+        FileFormat format,
+        long targetFileSize,
+        Schema writeSchema,
+        StructType dsSchema,
+        int specId,
+        StructLike partition) {
+      this.tableBroadcast = tableBroadcast;
+      this.queryId = queryId;
+      this.format = format;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+      this.specId = specId;
+      this.partition = partition;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory deleteFileFactory =
+          OutputFileFactory.builderFor(table, partitionId, taskId)
+              .format(format)
+              .operationId(queryId)
+              .suffix("deletes")
+              .build();
+
+      Schema positionDeleteRowSchema = positionDeleteRowSchema();
+      StructType deleteSparkType = deleteSparkType();
+      StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow();
+
+      SparkFileWriterFactory writerFactoryWithRow =
+          SparkFileWriterFactory.builderFor(table)
+              .deleteFileFormat(format)
+              .positionDeleteRowSchema(positionDeleteRowSchema)
+              .positionDeleteSparkType(deleteSparkType)
+              .build();
+      SparkFileWriterFactory writerFactoryWithoutRow =
+          SparkFileWriterFactory.builderFor(table)
+              .deleteFileFormat(format)
+              .positionDeleteSparkType(deleteSparkTypeWithoutRow)
+              .build();
+
+      return new DeleteWriter(
+          table,
+          writerFactoryWithRow,
+          writerFactoryWithoutRow,
+          deleteFileFactory,
+          targetFileSize,
+          dsSchema,
+          specId,
+          partition);
+    }
+
+    private Schema positionDeleteRowSchema() {
+      return new Schema(
+          writeSchema
+              .findField(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+              .type()
+              .asStructType()
+              .fields());
+    }
+
+    private StructType deleteSparkType() {
+      return new StructType(
+          new StructField[] {
+            dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+            dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+            dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME)
+          });
+    }
+
+    private StructType deleteSparkTypeWithoutRow() {
+      return new StructType(
+          new StructField[] {
+            dsSchema.apply(MetadataColumns.DELETE_FILE_PATH.name()),
+            dsSchema.apply(MetadataColumns.DELETE_FILE_POS.name()),
+          });
+    }
+  }
+
+  /**
+   * Writer for position deletes metadata table.
+   *
+   * <p>Iceberg specifies delete files schema as having either 'row' as a required field, or omits
+   * 'row' altogether. This is to ensure accuracy of delete file statistics on 'row' column. Hence,
+   * this writer, if receiving source position deletes with null and non-null rows, redirects rows
+   * with null 'row' to one file writer, and non-null 'row' to another file writer.
+   *
+   * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an
+   * assumption that all incoming deletes belong to the same partition.
+   */
+  private static class DeleteWriter implements DataWriter<InternalRow> {
+    private final SparkFileWriterFactory writerFactoryWithRow;
+    private final SparkFileWriterFactory writerFactoryWithoutRow;
+    private final OutputFileFactory deleteFileFactory;
+    private final long targetFileSize;
+    private final PositionDelete<InternalRow> positionDelete;
+    private final FileIO io;
+    private final PartitionSpec spec;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+    private final int rowOrdinal;
+    private final int rowSize;
+    private final StructLike partition;
+
+    private ClusteredPositionDeleteWriter<InternalRow> writerWithRow;
+    private ClusteredPositionDeleteWriter<InternalRow> writerWithoutRow;
+    private boolean closed = false;
+
+    /**
+     * Constructs a {@link DeleteWriter}.
+     *
+     * @param table position deletes metadata table
+     * @param writerFactoryWithRow writer factory for deletes with non-null 'row'
+     * @param writerFactoryWithoutRow writer factory for deletes with null 'row'
+     * @param deleteFileFactory delete file factory
+     * @param targetFileSize target file size
+     * @param dsSchema schema of incoming dataset of position deletes
+     * @param specId partition spec id of incoming position deletes. All incoming partition deletes
+     *     are required to have the same spec id.
+     * @param partition partition value of incoming position delete. All incoming partition deletes
+     *     are required to have the same partition.
+     */
+    DeleteWriter(
+        Table table,
+        SparkFileWriterFactory writerFactoryWithRow,
+        SparkFileWriterFactory writerFactoryWithoutRow,
+        OutputFileFactory deleteFileFactory,
+        long targetFileSize,
+        StructType dsSchema,
+        int specId,
+        StructLike partition) {
+      this.deleteFileFactory = deleteFileFactory;
+      this.targetFileSize = targetFileSize;
+      this.writerFactoryWithRow = writerFactoryWithRow;
+      this.writerFactoryWithoutRow = writerFactoryWithoutRow;
+      this.positionDelete = PositionDelete.create();
+      this.io = table.io();
+      this.spec = table.specs().get(specId);
+      this.partition = partition;
+
+      this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name());
+      this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name());
+
+      this.rowOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME);
+      DataType type = dsSchema.apply(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME).dataType();
+      Preconditions.checkArgument(
+          type instanceof StructType, "Expected row as struct type but was %s", type);
+      this.rowSize = ((StructType) type).size();
+    }
+
+    @Override
+    public void write(InternalRow record) throws IOException {
+      String file = record.getString(fileOrdinal);
+      long position = record.getLong(positionOrdinal);
+      InternalRow row = record.getStruct(rowOrdinal, rowSize);
+      if (row != null) {
+        positionDelete.set(file, position, row);
+        lazyWriterWithRow().write(positionDelete, spec, partition);
+      } else {
+        positionDelete.set(file, position, null);
+        lazyWriterWithoutRow().write(positionDelete, spec, partition);
+      }
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+      return new DeleteTaskCommit(allDeleteFiles());
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+      SparkCleanupUtil.deleteTaskFiles(io, allDeleteFiles());
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        if (writerWithRow != null) {
+          writerWithRow.close();
+        }
+        if (writerWithoutRow != null) {
+          writerWithoutRow.close();
+        }
+        this.closed = true;
+      }
+    }
+
+    private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithRow() {
+      if (writerWithRow == null) {
+        this.writerWithRow =
+            new ClusteredPositionDeleteWriter<>(
+                writerFactoryWithRow, deleteFileFactory, io, targetFileSize);
+      }
+      return writerWithRow;
+    }
+
+    private ClusteredPositionDeleteWriter<InternalRow> lazyWriterWithoutRow() {
+      if (writerWithoutRow == null) {
+        this.writerWithoutRow =
+            new ClusteredPositionDeleteWriter<>(
+                writerFactoryWithoutRow, deleteFileFactory, io, targetFileSize);
+      }
+      return writerWithoutRow;
+    }
+
+    private List<DeleteFile> allDeleteFiles() {
+      List<DeleteFile> allDeleteFiles = Lists.newArrayList();
+      if (writerWithRow != null) {
+        allDeleteFiles.addAll(writerWithRow.result().deleteFiles());
+      }
+      if (writerWithoutRow != null) {
+        allDeleteFiles.addAll(writerWithoutRow.result().deleteFiles());
+      }
+      return allDeleteFiles;
+    }
+  }
+
+  public static class DeleteTaskCommit implements WriterCommitMessage {
+    private final DeleteFile[] taskFiles;
+
+    DeleteTaskCommit(List<DeleteFile> deleteFiles) {
+      this.taskFiles = deleteFiles.toArray(new DeleteFile[0]);
+    }
+
+    DeleteFile[] files() {
+      return taskFiles;
+    }
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
new file mode 100644
index 0000000000..cc5c987fc4
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link
+ * SparkPositionDeletesRewrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+public class SparkPositionDeletesRewriteBuilder implements WriteBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final SparkWriteConf writeConf;
+  private final LogicalWriteInfo writeInfo;
+  private final StructType dsSchema;
+  private final Schema writeSchema;
+
+  SparkPositionDeletesRewriteBuilder(
+      SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+    this.spark = spark;
+    this.table = table;
+    this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
+    this.writeInfo = info;
+    this.dsSchema = info.schema();
+    this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive());
+  }
+
+  @Override
+  public Write build() {
+    String fileSetId = writeConf.rewrittenFileSetId();
+    boolean handleTimestampWithoutZone = writeConf.handleTimestampWithoutZone();
+
+    Preconditions.checkArgument(
+        fileSetId != null, "Can only write to %s via actions", table.name());
+    Preconditions.checkArgument(
+        handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
+        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+    // all files of rewrite group have same partition and spec id
+    ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+    List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId);
+    Preconditions.checkArgument(
+        tasks != null && tasks.size() > 0, "No scan tasks found for %s", fileSetId);
+
+    int specId = specId(fileSetId, tasks);
+    StructLike partition = partition(fileSetId, tasks);
+
+    return new SparkPositionDeletesRewrite(
+        spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition);
+  }
+
+  private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet());
+    Preconditions.checkArgument(
+        specIds.size() == 1,
+        "All scan tasks of %s are expected to have same spec id, but got %s",
+        fileSetId,
+        Joiner.on(",").join(specIds));
+    return tasks.get(0).spec().specId();
+  }
+
+  private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType());
+    tasks.stream().map(ContentScanTask::partition).forEach(partitions::add);
+    Preconditions.checkArgument(
+        partitions.size() == 1,
+        "All scan tasks of %s are expected to have the same partition, but got %s",
+        fileSetId,
+        Joiner.on(",").join(partitions));
+    return tasks.get(0).partition();
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index f143071572..d845284513 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesTable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
@@ -279,7 +280,11 @@ public class SparkTable
     Preconditions.checkArgument(
         snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
 
-    return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
+    if (icebergTable instanceof PositionDeletesTable) {
+      return new SparkPositionDeletesRewriteBuilder(sparkSession(), icebergTable, branch, info);
+    } else {
+      return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
+    }
   }
 
   @Override
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index b6b8b5b09d..bb062a5ba4 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1679,7 +1679,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
   private Set<String> cacheContents(Table table) {
     return ImmutableSet.<String>builder()
         .addAll(manager.fetchSetIds(table))
-        .addAll(coordinator.fetchSetIDs(table))
+        .addAll(coordinator.fetchSetIds(table))
         .build();
   }
 
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
index 2be5c8444d..2ec4f2f4f9 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -29,11 +30,13 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
@@ -45,20 +48,30 @@ import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
 import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkStructLike;
-import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.functions;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -68,20 +81,51 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestPositionDeletesTable extends SparkTestBase {
+public class TestPositionDeletesTable extends SparkCatalogTestBase {
 
   public static final Schema SCHEMA =
       new Schema(
           Types.NestedField.required(1, "id", Types.IntegerType.get()),
           Types.NestedField.required(2, "data", Types.StringType.get()));
+  private static final Map<String, String> CATALOG_PROPS =
+      ImmutableMap.of(
+          "type", "hive",
+          "default-namespace", "default",
+          "cache-enabled", "false");
+  private static final List<String> NON_PATH_COLS =
+      ImmutableList.of("file_path", "pos", "row", "partition", "spec_id");
+
   private final FileFormat format;
 
-  @Parameterized.Parameters(name = "fileFormat = {0}")
+  @Parameterized.Parameters(
+      name =
+          "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}")
   public static Object[][] parameters() {
-    return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO}, {FileFormat.ORC}};
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        CATALOG_PROPS,
+        FileFormat.PARQUET
+      },
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        CATALOG_PROPS,
+        FileFormat.AVRO
+      },
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        CATALOG_PROPS,
+        FileFormat.ORC
+      },
+    };
   }
 
-  public TestPositionDeletesTable(FileFormat format) {
+  public TestPositionDeletesTable(
+      String catalogName, String implementation, Map<String, String> config, FileFormat format) {
+    super(catalogName, implementation, config);
     this.format = format;
   }
 
@@ -678,8 +722,8 @@ public class TestPositionDeletesTable extends SparkTestBase {
   }
 
   @Test
-  public void testScanTaskSetManager() throws IOException {
-    String tableName = "scan_task_set_manager";
+  public void testWrite() throws IOException, NoSuchTableException {
+    String tableName = "test_write";
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
     Table tab = createTable(tableName, SCHEMA, spec);
 
@@ -689,19 +733,422 @@ public class TestPositionDeletesTable extends SparkTestBase {
 
     // Add position deletes for both partitions
     Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
-    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+    for (String partValue : ImmutableList.of("a", "b")) {
+      try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+        String fileSetID = UUID.randomUUID().toString();
+        stageTask(tab, fileSetID, tasks);
+
+        Dataset<Row> scanDF =
+            spark
+                .read()
+                .format("iceberg")
+                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+                .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+                .load(posDeletesTableName);
+
+        Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+        scanDF
+            .writeTo(posDeletesTableName)
+            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+            .append();
+
+        commit(tab, posDeletesTable, fileSetID, 1);
+      }
+    }
+
+    // Prepare expected values (without 'delete_file_path' as these have been rewritten)
+    GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    Record partitionB = partitionRecordTemplate.copy("data", "b");
+    StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, null);
+    StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, null);
+    StructLikeSet allExpected =
+        StructLikeSet.create(
+            TypeUtil.selectNot(
+                    posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+                .asStruct());
+    allExpected.addAll(expectedA);
+    allExpected.addAll(expectedB);
+
+    // Compare values without 'delete_file_path' as these have been rewritten
+    StructLikeSet actual = actual(tableName, tab, null, NON_PATH_COLS);
+    Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWriteUnpartitionedNullRows() throws Exception {
+    String tableName = "write_null_rows";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+    DataFile dFile = dataFile(tab);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+    deletes.add(Pair.of(dFile.path(), 0L));
+    deletes.add(Pair.of(dFile.path(), 1L));
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(
+            tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+    tab.newRowDelta().addDeletes(posDeletes.first()).commit();
+
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+    try (CloseableIterable<ScanTask> tasks = posDeletesTable.newBatchScan().planFiles()) {
+      String fileSetID = UUID.randomUUID().toString();
+      stageTask(tab, fileSetID, tasks);
+
+      Dataset<Row> scanDF =
+          spark
+              .read()
+              .format("iceberg")
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+              .load(posDeletesTableName);
+      Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+      scanDF
+          .writeTo(posDeletesTableName)
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+          .append();
+
+      commit(tab, posDeletesTable, fileSetID, 1);
+    }
+
+    // Compare values without 'delete_file_path' as these have been rewritten
+    StructLikeSet actual =
+        actual(tableName, tab, null, ImmutableList.of("file_path", "pos", "row", "spec_id"));
+
+    List<PositionDelete<?>> expectedDeletes =
+        Lists.newArrayList(positionDelete(dFile.path(), 0L), positionDelete(dFile.path(), 1L));
+    StructLikeSet expected = expected(tab, expectedDeletes, null, null);
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWriteMixedRows() throws Exception {
+    String tableName = "write_mixed_rows";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    // Add a delete file with row and without row
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+    deletes.add(Pair.of(dataFileA.path(), 0L));
+    deletes.add(Pair.of(dataFileA.path(), 1L));
+    Pair<DeleteFile, CharSequenceSet> deletesWithoutRow =
+        FileHelpers.writeDeleteFile(
+            tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of("a"), deletes);
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesWithRow = deleteFile(tab, dataFileB, "b");
+
+    tab.newRowDelta()
+        .addDeletes(deletesWithoutRow.first())
+        .addDeletes(deletesWithRow.second())
+        .commit();
+
+    // rewrite delete files
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+    for (String partValue : ImmutableList.of("a", "b")) {
+      try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+        String fileSetID = UUID.randomUUID().toString();
+        stageTask(tab, fileSetID, tasks);
+
+        Dataset<Row> scanDF =
+            spark
+                .read()
+                .format("iceberg")
+                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+                .load(posDeletesTableName);
+        Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+        scanDF
+            .writeTo(posDeletesTableName)
+            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+            .append();
+
+        commit(tab, posDeletesTable, fileSetID, 1);
+      }
+    }
+
+    // Compare values without 'delete_file_path' as these have been rewritten
+    StructLikeSet actual =
+        actual(
+            tableName,
+            tab,
+            null,
+            ImmutableList.of("file_path", "pos", "row", "partition", "spec_id"));
+
+    // Prepare expected values
+    GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    Record partitionB = partitionRecordTemplate.copy("data", "b");
+    StructLikeSet allExpected =
+        StructLikeSet.create(
+            TypeUtil.selectNot(
+                    posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+                .asStruct());
+    allExpected.addAll(
+        expected(
+            tab,
+            Lists.newArrayList(
+                positionDelete(dataFileA.path(), 0L), positionDelete(dataFileA.path(), 1L)),
+            partitionA,
+            null));
+    allExpected.addAll(expected(tab, deletesWithRow.first(), partitionB, null));
+
+    Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWritePartitionEvolutionAdd() throws Exception {
+    // Create unpartitioned table
+    String tableName = "write_partition_evolution_add";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+    int specId0 = tab.spec().specId();
+
+    // Add files with unpartitioned spec
+    DataFile dataFileUnpartitioned = dataFile(tab);
+    tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+        deleteFile(tab, dataFileUnpartitioned);
+    tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+    // Switch partition spec to (data)
+    tab.updateSpec().addField("data").commit();
+    int specId1 = tab.spec().specId();
+
+    // Add files with new spec (data)
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
     tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
 
     Table posDeletesTable =
         MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
-    String posDeletesTableName = "default." + tableName + ".position_deletes";
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+    // Read/write back unpartitioned data
+    try (CloseableIterable<ScanTask> tasks =
+        posDeletesTable.newBatchScan().filter(Expressions.isNull("partition.data")).planFiles()) {
+      String fileSetID = UUID.randomUUID().toString();
+      stageTask(tab, fileSetID, tasks);
+
+      Dataset<Row> scanDF =
+          spark
+              .read()
+              .format("iceberg")
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+              .load(posDeletesTableName);
+      Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+      scanDF
+          .writeTo(posDeletesTableName)
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+          .append();
+
+      commit(tab, posDeletesTable, fileSetID, 1);
+    }
+
+    // Select deletes from unpartitioned data
+    // Compare values without 'delete_file_path' as these have been rewritten
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+    StructLikeSet expectedUnpartitioned =
+        expected(tab, deletesUnpartitioned.first(), unpartitionedRecord, specId0, null);
+    StructLikeSet actualUnpartitioned =
+        actual(tableName, tab, "partition.data IS NULL", NON_PATH_COLS);
+    Assert.assertEquals(
+        "Position Delete table should contain expected rows",
+        expectedUnpartitioned,
+        actualUnpartitioned);
+
+    // Read/write back new partition spec (data)
+    for (String partValue : ImmutableList.of("a", "b")) {
+      try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+        String fileSetID = UUID.randomUUID().toString();
+        stageTask(tab, fileSetID, tasks);
+
+        Dataset<Row> scanDF =
+            spark
+                .read()
+                .format("iceberg")
+                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+                .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+                .load(posDeletesTableName);
+        Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+        scanDF
+            .writeTo(posDeletesTableName)
+            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+            .append();
+
+        // commit the rewrite
+        commit(tab, posDeletesTable, fileSetID, 1);
+      }
+    }
+
+    // Select deletes from new spec (data)
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    Record partitionB = partitionRecordTemplate.copy("data", "b");
+    StructLikeSet expectedAll =
+        StructLikeSet.create(
+            TypeUtil.selectNot(
+                    posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+                .asStruct());
+    expectedAll.addAll(expected(tab, deletesA.first(), partitionA, specId1, null));
+    expectedAll.addAll(expected(tab, deletesB.first(), partitionB, specId1, null));
+    StructLikeSet actualAll =
+        actual(tableName, tab, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS);
+    Assert.assertEquals(
+        "Position Delete table should contain expected rows", expectedAll, actualAll);
+
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWritePartitionEvolutionDisallowed() throws Exception {
+    // Create unpartitioned table
+    String tableName = "write_partition_evolution_write";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+    // Add files with unpartitioned spec
+    DataFile dataFileUnpartitioned = dataFile(tab);
+    tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+        deleteFile(tab, dataFileUnpartitioned);
+    tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+    Dataset<Row> scanDF;
+    String fileSetID = UUID.randomUUID().toString();
     try (CloseableIterable<ScanTask> tasks = posDeletesTable.newBatchScan().planFiles()) {
+      stageTask(tab, fileSetID, tasks);
+
+      scanDF =
+          spark
+              .read()
+              .format("iceberg")
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+              .load(posDeletesTableName);
+      Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+
+      // Add partition field to render the original un-partitioned dataset un-commitable
+      tab.updateSpec().addField("data").commit();
+    }
+
+    Assert.assertThrows(
+        AnalysisException.class,
+        () ->
+            scanDF
+                .writeTo(posDeletesTableName)
+                .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+                .append());
+
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWriteSchemaEvolutionAdd() throws Exception {
+    // Create table with original schema
+    String tableName = "write_schema_evolution_add";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    // Add files with original schema
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Add files with new schema
+    tab.updateSchema()
+        .addColumn("new_col_1", Types.IntegerType.get())
+        .addColumn("new_col_2", Types.IntegerType.get())
+        .commit();
+
+    // Add files with new schema
+    DataFile dataFileC = dataFile(tab, "c");
+    DataFile dataFileD = dataFile(tab, "d");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+    tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+    // rewrite files of old schema
+    try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", "a")) {
       String fileSetID = UUID.randomUUID().toString();
+      stageTask(tab, fileSetID, tasks);
+
+      Dataset<Row> scanDF =
+          spark
+              .read()
+              .format("iceberg")
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+              .load(posDeletesTableName);
+
+      Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+      scanDF
+          .writeTo(posDeletesTableName)
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+          .append();
+
+      commit(tab, posDeletesTable, fileSetID, 1);
+    }
+
+    // Select deletes from old schema
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    // pad expected delete rows with null values for new columns
+    List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+    expectedDeletesA.forEach(
+        d -> {
+          GenericRecord nested = d.get(2, GenericRecord.class);
+          GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+          padded.set(0, nested.get(0));
+          padded.set(1, nested.get(1));
+          padded.set(2, null);
+          padded.set(3, null);
+          d.set(2, padded);
+        });
+    StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null);
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
 
-      ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
-      taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks));
+    // rewrite files of new schema
+    try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", "c")) {
+      String fileSetID = UUID.randomUUID().toString();
+      stageTask(tab, fileSetID, tasks);
 
-      // read and pack original 4 files into 2 splits
       Dataset<Row> scanDF =
           spark
               .read()
@@ -710,30 +1157,171 @@ public class TestPositionDeletesTable extends SparkTestBase {
               .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
               .load(posDeletesTableName);
 
-      Assert.assertEquals("Num partitions should match", 2, scanDF.javaRDD().getNumPartitions());
+      Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+      scanDF
+          .writeTo(posDeletesTableName)
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+          .append();
+
+      commit(tab, posDeletesTable, fileSetID, 1);
+    }
+
+    // Select deletes from new schema
+    Record partitionC = partitionRecordTemplate.copy("data", "c");
+    StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null);
+    StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testWriteSchemaEvolutionRemove() throws Exception {
+    // Create table with original schema
+    String tableName = "write_schema_evolution_remove";
+    Schema oldSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()),
+            Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(oldSchema).identity("data").build();
+    Table tab = createTable(tableName, oldSchema, spec);
+
+    // Add files with original schema
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Add files with new schema
+    tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
+
+    // Add files with new schema
+    DataFile dataFileC = dataFile(tab, "c");
+    DataFile dataFileD = dataFile(tab, "d");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+    tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
 
-      dropTable(tableName);
+    Table posDeletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+    // rewrite files
+    for (String partValue : ImmutableList.of("a", "b", "c", "d")) {
+      try (CloseableIterable<ScanTask> tasks = tasks(posDeletesTable, "data", partValue)) {
+        String fileSetID = UUID.randomUUID().toString();
+        stageTask(tab, fileSetID, tasks);
+
+        Dataset<Row> scanDF =
+            spark
+                .read()
+                .format("iceberg")
+                .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
+                .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE)
+                .load(posDeletesTableName);
+        Assert.assertEquals(1, scanDF.javaRDD().getNumPartitions());
+        scanDF
+            .writeTo(posDeletesTableName)
+            .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID)
+            .append();
+
+        commit(tab, posDeletesTable, fileSetID, 1);
+      }
     }
+
+    // Select deletes from old schema
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    // remove deleted columns from expected result
+    List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+    expectedDeletesA.forEach(
+        d -> {
+          GenericRecord nested = d.get(2, GenericRecord.class);
+          GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+          padded.set(0, nested.get(0));
+          padded.set(1, nested.get(1));
+          d.set(2, padded);
+        });
+    StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null);
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS);
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Select deletes from new schema
+    Record partitionC = partitionRecordTemplate.copy("data", "c");
+    StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null);
+    StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS);
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testNormalWritesNotAllowed() throws IOException {
+    String tableName = "test_normal_write_not_allowed";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    DataFile dataFileA = dataFile(tab, "a");
+    tab.newAppend().appendFile(dataFileA).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    tab.newRowDelta().addDeletes(deletesA.second()).commit();
+
+    String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes";
+
+    Dataset<Row> scanDF = spark.read().format("iceberg").load(posDeletesTableName);
+
+    Assert.assertThrows(
+        "position_deletes table can only be written by RewriteDeleteFiles",
+        IllegalArgumentException.class,
+        () -> scanDF.writeTo(posDeletesTableName).append());
+
+    dropTable(tableName);
   }
 
   private StructLikeSet actual(String tableName, Table table) {
-    return actual(tableName, table, null);
+    return actual(tableName, table, null, null);
   }
 
   private StructLikeSet actual(String tableName, Table table, String filter) {
+    return actual(tableName, table, filter, null);
+  }
+
+  private StructLikeSet actual(String tableName, Table table, String filter, List<String> cols) {
     Dataset<Row> df =
-        spark.read().format("iceberg").load("default." + tableName + ".position_deletes");
+        spark
+            .read()
+            .format("iceberg")
+            .load(catalogName + ".default." + tableName + ".position_deletes");
     if (filter != null) {
       df = df.filter(filter);
     }
+    if (cols != null) {
+      df = df.select(cols.get(0), cols.subList(1, cols.size()).toArray(new String[0]));
+    }
     Table deletesTable =
         MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
     Types.StructType projection = deletesTable.schema().asStruct();
+    if (cols != null) {
+      projection =
+          Types.StructType.of(
+              projection.fields().stream()
+                  .filter(f -> cols.contains(f.name()))
+                  .collect(Collectors.toList()));
+    }
+    Types.StructType finalProjection = projection;
     StructLikeSet set = StructLikeSet.create(projection);
     df.collectAsList()
         .forEach(
             row -> {
-              SparkStructLike rowWrapper = new SparkStructLike(projection);
+              SparkStructLike rowWrapper = new SparkStructLike(finalProjection);
               set.add(rowWrapper.wrap(row));
             });
 
@@ -747,11 +1335,12 @@ public class TestPositionDeletesTable extends SparkTestBase {
             "2",
             TableProperties.DEFAULT_FILE_FORMAT,
             format.toString());
-    return catalog.createTable(TableIdentifier.of("default", name), schema, spec, properties);
+    return validationCatalog.createTable(
+        TableIdentifier.of("default", name), schema, spec, properties);
   }
 
   protected void dropTable(String name) {
-    catalog.dropTable(TableIdentifier.of("default", name));
+    validationCatalog.dropTable(TableIdentifier.of("default", name), false);
   }
 
   private PositionDelete<GenericRecord> positionDelete(CharSequence path, Long position) {
@@ -781,6 +1370,13 @@ public class TestPositionDeletesTable extends SparkTestBase {
         MetadataTableUtils.createMetadataTableInstance(
             testTable, MetadataTableType.POSITION_DELETES);
     Types.StructType posDeleteSchema = deletesTable.schema().asStruct();
+    // Do not compare file paths
+    if (deleteFilePath == null) {
+      posDeleteSchema =
+          TypeUtil.selectNot(
+                  deletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID))
+              .asStruct();
+    }
     final Types.StructType finalSchema = posDeleteSchema;
     StructLikeSet set = StructLikeSet.create(posDeleteSchema);
     deletes.stream()
@@ -794,7 +1390,9 @@ public class TestPositionDeletesTable extends SparkTestBase {
                 record.setField("partition", partitionStruct);
               }
               record.setField("spec_id", specId);
-              record.setField("delete_file_path", deleteFilePath);
+              if (deleteFilePath != null) {
+                record.setField("delete_file_path", deleteFilePath);
+              }
               return record;
             })
         .forEach(set::add);
@@ -890,7 +1488,7 @@ public class TestPositionDeletesTable extends SparkTestBase {
             positionDelete(
                 tab.schema(),
                 dataFile.path(),
-                0L,
+                1L,
                 idPartition != null ? idPartition : 61,
                 dataPartition != null ? dataPartition : "r"));
 
@@ -910,4 +1508,59 @@ public class TestPositionDeletesTable extends SparkTestBase {
             tab, Files.localOutput(temp.newFile()), partitionInfo, deletes);
     return Pair.of(deletes, deleteFile);
   }
+
+  private <T extends ScanTask> void stageTask(
+      Table tab, String fileSetID, CloseableIterable<T> tasks) {
+    ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+    taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks));
+  }
+
+  private void commit(
+      Table baseTab,
+      Table posDeletesTable,
+      String fileSetID,
+      int expectedSourceFiles,
+      int expectedTargetFiles) {
+    PositionDeletesRewriteCoordinator rewriteCoordinator = PositionDeletesRewriteCoordinator.get();
+    Set<DeleteFile> rewrittenFiles =
+        ScanTaskSetManager.get().fetchTasks(posDeletesTable, fileSetID).stream()
+            .map(t -> ((PositionDeletesScanTask) t).file())
+            .collect(Collectors.toSet());
+    Set<DeleteFile> addedFiles = rewriteCoordinator.fetchNewFiles(posDeletesTable, fileSetID);
+
+    // Assert new files and old files are equal in number but different in paths
+    Assert.assertEquals(expectedSourceFiles, rewrittenFiles.size());
+    Assert.assertEquals(expectedTargetFiles, addedFiles.size());
+
+    List<String> sortedAddedFiles =
+        addedFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
+    List<String> sortedRewrittenFiles =
+        rewrittenFiles.stream().map(f -> f.path().toString()).sorted().collect(Collectors.toList());
+    Assert.assertNotEquals("Lists should not be the same", sortedAddedFiles, sortedRewrittenFiles);
+
+    baseTab
+        .newRewrite()
+        .rewriteFiles(ImmutableSet.of(), rewrittenFiles, ImmutableSet.of(), addedFiles)
+        .commit();
+  }
+
+  private void commit(Table baseTab, Table posDeletesTable, String fileSetID, int expectedFiles) {
+    commit(baseTab, posDeletesTable, fileSetID, expectedFiles, expectedFiles);
+  }
+
+  private CloseableIterable<ScanTask> tasks(
+      Table posDeletesTable, String partitionColumn, String partitionValue) {
+
+    Expression filter = Expressions.equal("partition." + partitionColumn, partitionValue);
+    CloseableIterable<ScanTask> files = posDeletesTable.newBatchScan().filter(filter).planFiles();
+
+    // take care of fail to filter in some partition evolution cases
+    return CloseableIterable.filter(
+        files,
+        t -> {
+          StructLike filePartition = ((PositionDeletesScanTask) t).partition();
+          String filePartitionValue = filePartition.get(0, String.class);
+          return filePartitionValue != null && filePartitionValue.equals(partitionValue);
+        });
+  }
 }