You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/17 21:03:39 UTC

[iceberg] branch master updated: Spark 3.3: Support reading position deletes table (#6716)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bd58ca5dd7 Spark 3.3: Support reading position deletes table (#6716)
bd58ca5dd7 is described below

commit bd58ca5dd7759c990993c3d85a824bec595078e2
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Fri Feb 17 13:03:33 2023 -0800

    Spark 3.3: Support reading position deletes table (#6716)
    
    Co-authored-by: Ryan Blue <bl...@apache.org>
---
 .../apache/iceberg/expressions/ExpressionUtil.java |  29 +
 .../java/org/apache/iceberg/BaseMetadataTable.java |  19 +
 .../org/apache/iceberg/PositionDeletesTable.java   |  23 +-
 .../java/org/apache/iceberg/data/FileHelpers.java  |  22 +
 .../spark/source/PositionDeletesRowReader.java     | 108 +++
 .../spark/source/SparkRowReaderFactory.java        |   4 +
 .../spark/source/TestPositionDeletesTable.java     | 861 +++++++++++++++++++++
 7 files changed, 1059 insertions(+), 7 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index 9d94be1635..3512f274e2 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -29,6 +29,7 @@ import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.transforms.Transforms;
@@ -82,6 +83,24 @@ public class ExpressionUtil {
     return ExpressionVisitors.visit(expr, new StringSanitizer());
   }
 
+  /**
+   * Extracts an expression that references only the given column IDs from the given expression.
+   *
+   * <p>The result is inclusive. If a row would match the original filter, it must match the result
+   * filter.
+   *
+   * @param expression a filter Expression
+   * @param schema a Schema
+   * @param caseSensitive whether binding is case sensitive
+   * @param ids field IDs used to match predicates to extract from the expression
+   * @return an Expression that selects at least the same rows as the original using only the IDs
+   */
+  public static Expression extractByIdInclusive(
+      Expression expression, Schema schema, boolean caseSensitive, int... ids) {
+    PartitionSpec spec = identitySpec(schema, ids);
+    return Projections.inclusive(spec, caseSensitive).project(Expressions.rewriteNot(expression));
+  }
+
   /**
    * Returns whether two unbound expressions will accept the same inputs.
    *
@@ -396,4 +415,14 @@ public class ExpressionUtil {
     // hash the value and return the hash as hex
     return String.format("(hash-%08x)", HASH_FUNC.apply(value));
   }
+
+  private static PartitionSpec identitySpec(Schema schema, int... ids) {
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+
+    for (int id : ids) {
+      specBuilder.identity(schema.findColumnName(id));
+    }
+
+    return specBuilder.build();
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 251ed008a5..d5b89865b5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
@@ -83,6 +84,24 @@ public abstract class BaseMetadataTable extends BaseReadOnlyTable
     return builder.build();
   }
 
+  /**
+   * This method transforms the given partition specs to specs that are used to rewrite the
+   * user-provided filter expression against the given metadata table.
+   *
+   * <p>See: {@link #transformSpec(Schema, PartitionSpec)}
+   *
+   * @param metadataTableSchema schema of the metadata table
+   * @param specs specs on which the metadata table schema is based
+   * @return specs used to rewrite the metadata table filters to partition filters using an
+   *     inclusive projection
+   */
+  static Map<Integer, PartitionSpec> transformSpecs(
+      Schema metadataTableSchema, Map<Integer, PartitionSpec> specs) {
+    return specs.values().stream()
+        .map(spec -> transformSpec(metadataTableSchema, spec))
+        .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec));
+  }
+
   abstract MetadataTableType metadataTableType();
 
   protected BaseTable table() {
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 47d0e5100e..29228d136d 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.ResidualEvaluator;
@@ -43,15 +42,18 @@ import org.apache.iceberg.util.TableScanUtil;
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {
-    super(table, table.name() + ".position_deletes");
-    this.schema = calculateSchema();
+    this(table, table.name() + ".position_deletes");
   }
 
   PositionDeletesTable(Table table, String name) {
     super(table, name);
     this.schema = calculateSchema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs = transformSpecs(schema(), table.specs());
   }
 
   @Override
@@ -75,6 +77,16 @@ public class PositionDeletesTable extends BaseMetadataTable {
     return schema;
   }
 
+  @Override
+  public PartitionSpec spec() {
+    return specs.get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    return specs;
+  }
+
   private Schema calculateSchema() {
     Types.StructType partitionType = Partitioning.partitionType(table());
     Schema result =
@@ -144,10 +156,7 @@ public class PositionDeletesTable extends BaseMetadataTable {
       String schemaString = SchemaParser.toJson(tableSchema());
 
       // prepare transformed partition specs and caches
-      Map<Integer, PartitionSpec> transformedSpecs =
-          table().specs().values().stream()
-              .map(spec -> transformSpec(tableSchema(), spec))
-              .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec));
+      Map<Integer, PartitionSpec> transformedSpecs = transformSpecs(tableSchema(), table().specs());
 
       LoadingCache<Integer, ResidualEvaluator> residualCache =
           partitionCacheOf(
diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
index 6531441fa5..557e775057 100644
--- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
@@ -139,6 +139,28 @@ public class FileHelpers {
         .build();
   }
 
+  public static DeleteFile writePosDeleteFile(
+      Table table, OutputFile out, StructLike partition, List<PositionDelete<?>> deletes)
+      throws IOException {
+    FileFormat format = defaultFormat(table.properties());
+    FileAppenderFactory<Record> factory =
+        new GenericAppenderFactory(
+            table.schema(),
+            table.spec(),
+            null, // Equality Fields
+            null, // Equality Delete row schema
+            table.schema()); // Position Delete row schema (will be wrapped)
+
+    PositionDeleteWriter<?> writer = factory.newPosDeleteWriter(encrypt(out), format, partition);
+    try (Closeable toClose = writer) {
+      for (PositionDelete delete : deletes) {
+        writer.write(delete);
+      }
+    }
+
+    return writer.toDeleteFile();
+  }
+
   private static EncryptedOutputFile encrypt(OutputFile out) {
     return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
   }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
new file mode 100644
index 0000000000..04eecc80bb
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with %s", task);
+
+    // select out constant fields when pushing down filter to row reader
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
+    Set<Integer> nonConstantFieldIds = nonConstantFieldIds(idToConstant);
+    Expression residualWithoutConstants =
+        ExpressionUtil.extractByIdInclusive(
+            task.residual(), expectedSchema(), caseSensitive(), Ints.toArray(nonConstantFieldIds));
+
+    return newIterable(
+            inputFile,
+            task.file().format(),
+            task.start(),
+            task.length(),
+            residualWithoutConstants,
+            expectedSchema(),
+            idToConstant)
+        .iterator();
+  }
+
+  private Set<Integer> nonConstantFieldIds(Map<Integer, ?> idToConstant) {
+    Set<Integer> fields = expectedSchema().idToName().keySet();
+    return fields.stream()
+        .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
+        .filter(id -> !idToConstant.containsKey(id))
+        .collect(Collectors.toSet());
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
index 8755cb84a2..23699aeb16 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import org.apache.iceberg.ChangelogScanTask;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.InputPartition;
@@ -46,6 +47,9 @@ class SparkRowReaderFactory implements PartitionReaderFactory {
     } else if (partition.allTasksOfType(ChangelogScanTask.class)) {
       return new ChangelogRowReader(partition);
 
+    } else if (partition.allTasksOfType(PositionDeletesScanTask.class)) {
+      return new PositionDeletesRowReader(partition);
+
     } else {
       throw new UnsupportedOperationException(
           "Unsupported task group for row-based reads: " + partition.taskGroup());
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
new file mode 100644
index 0000000000..353bbf07be
--- /dev/null
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestPositionDeletesTable extends SparkTestBase {
+
+  public static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "data", Types.StringType.get()));
+  private final FileFormat format;
+
+  @Parameterized.Parameters(name = "fileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {{FileFormat.PARQUET}, {FileFormat.AVRO}, {FileFormat.ORC}};
+  }
+
+  public TestPositionDeletesTable(FileFormat format) {
+    this.format = format;
+  }
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testNullRows() throws IOException {
+    String tableName = "null_rows";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+
+    DataFile dFile = dataFile(tab);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+    deletes.add(Pair.of(dFile.path(), 0L));
+    deletes.add(Pair.of(dFile.path(), 1L));
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(
+            tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+    tab.newRowDelta().addDeletes(posDeletes.first()).commit();
+
+    StructLikeSet actual = actual(tableName, tab);
+
+    List<PositionDelete<?>> expectedDeletes =
+        Lists.newArrayList(positionDelete(dFile.path(), 0L), positionDelete(dFile.path(), 1L));
+    StructLikeSet expected =
+        expected(tab, expectedDeletes, null, posDeletes.first().path().toString());
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionedTable() throws IOException {
+    // Create table with two partitions
+    String tableName = "partitioned_table";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    // Add position deletes for both partitions
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Select deletes from one partition
+    StructLikeSet actual = actual(tableName, tab, "row.data='b'");
+    GenericRecord partitionB = GenericRecord.create(tab.spec().partitionType());
+    partitionB.setField("data", "b");
+    StructLikeSet expected =
+        expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testSelect() throws IOException {
+    // Create table with two partitions
+    String tableName = "select";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    // Add position deletes for both partitions
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Select certain columns
+    Dataset<Row> df =
+        spark
+            .read()
+            .format("iceberg")
+            .load("default." + tableName + ".position_deletes")
+            .withColumn("input_file", functions.input_file_name())
+            .select("row.id", "pos", "delete_file_path", "input_file");
+    List<Object[]> actual = rowsToJava(df.collectAsList());
+
+    // Select cols from expected delete values
+    List<Object[]> expected = Lists.newArrayList();
+    BiFunction<PositionDelete<?>, DeleteFile, Object[]> toRow =
+        (delete, file) -> {
+          int rowData = delete.get(2, GenericRecord.class).get(0, Integer.class);
+          long pos = delete.get(1, Long.class);
+          return row(rowData, pos, file.path().toString(), file.path().toString());
+        };
+    expected.addAll(
+        deletesA.first().stream()
+            .map(d -> toRow.apply(d, deletesA.second()))
+            .collect(Collectors.toList()));
+    expected.addAll(
+        deletesB.first().stream()
+            .map(d -> toRow.apply(d, deletesB.second()))
+            .collect(Collectors.toList()));
+
+    // Sort and compare
+    Comparator<Object[]> comp =
+        (o1, o2) -> {
+          int result = Integer.compare((int) o1[0], (int) o2[0]);
+          if (result != 0) {
+            return result;
+          } else {
+            return ((String) o1[2]).compareTo((String) o2[2]);
+          }
+        };
+    actual.sort(comp);
+    expected.sort(comp);
+    assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testSplitTasks() throws IOException {
+    String tableName = "big_table";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+    tab.updateProperties().set("read.split.target-size", "100").commit();
+    int records = 500;
+
+    GenericRecord record = GenericRecord.create(tab.schema());
+    List<org.apache.iceberg.data.Record> dataRecords = Lists.newArrayList();
+    for (int i = 0; i < records; i++) {
+      dataRecords.add(record.copy("id", i, "data", String.valueOf(i)));
+    }
+    DataFile dFile =
+        FileHelpers.writeDataFile(
+            tab,
+            Files.localOutput(temp.newFile()),
+            org.apache.iceberg.TestHelpers.Row.of(),
+            dataRecords);
+    tab.newAppend().appendFile(dFile).commit();
+
+    List<PositionDelete<?>> deletes = Lists.newArrayList();
+    for (long i = 0; i < records; i++) {
+      deletes.add(positionDelete(tab.schema(), dFile.path(), i, (int) i, String.valueOf(i)));
+    }
+    DeleteFile posDeletes =
+        FileHelpers.writePosDeleteFile(
+            tab, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), deletes);
+    tab.newRowDelta().addDeletes(posDeletes).commit();
+
+    Table deleteTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+    Assert.assertTrue(
+        "Position delete scan should produce more than one split",
+        Iterables.size(deleteTable.newBatchScan().planTasks()) > 1);
+
+    StructLikeSet actual = actual(tableName, tab);
+    StructLikeSet expected = expected(tab, deletes, null, posDeletes.path().toString());
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionFilter() throws IOException {
+    // Create table with two partitions
+    String tableName = "partition_filter";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+    Table deletesTab =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    // Add position deletes for both partitions
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Prepare expected values
+    GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    Record partitionB = partitionRecordTemplate.copy("data", "b");
+    StructLikeSet expectedA =
+        expected(tab, deletesA.first(), partitionA, deletesA.second().path().toString());
+    StructLikeSet expectedB =
+        expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+    StructLikeSet allExpected = StructLikeSet.create(deletesTab.schema().asStruct());
+    allExpected.addAll(expectedA);
+    allExpected.addAll(expectedB);
+
+    // Select deletes from all partitions
+    StructLikeSet actual = actual(tableName, tab);
+    Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+
+    // Select deletes from one partition
+    StructLikeSet actual2 = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actual2);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionTransformFilter() throws IOException {
+    // Create table with two partitions
+    String tableName = "partition_filter";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).truncate("data", 1).build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
+
+    DataFile dataFileA = dataFile(tab, new Object[] {"aa"}, new Object[] {"a"});
+    DataFile dataFileB = dataFile(tab, new Object[] {"bb"}, new Object[] {"b"});
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    // Add position deletes for both partitions
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA =
+        deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"});
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB =
+        deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"});
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Prepare expected values
+    GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType());
+    Record partitionA = partitionRecordTemplate.copy("data_trunc", "a");
+    Record partitionB = partitionRecordTemplate.copy("data_trunc", "b");
+    StructLikeSet expectedA =
+        expected(tab, deletesA.first(), partitionA, deletesA.second().path().toString());
+    StructLikeSet expectedB =
+        expected(tab, deletesB.first(), partitionB, deletesB.second().path().toString());
+    StructLikeSet allExpected = StructLikeSet.create(deletesTable.schema().asStruct());
+    allExpected.addAll(expectedA);
+    allExpected.addAll(expectedB);
+
+    // Select deletes from all partitions
+    StructLikeSet actual = actual(tableName, tab);
+    Assert.assertEquals("Position Delete table should contain expected rows", allExpected, actual);
+
+    // Select deletes from one partition
+    StructLikeSet actual2 = actual(tableName, tab, "partition.data_trunc = 'a' AND pos >= 0");
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actual2);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionEvolutionReplace() throws Exception {
+    // Create table with spec (data)
+    String tableName = "partition_evolution";
+    PartitionSpec originalSpec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, originalSpec);
+    int dataSpec = tab.spec().specId();
+
+    // Add files with old spec (data)
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileA, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Switch partition spec from (data) to (id)
+    tab.updateSpec().removeField("data").addField("id").commit();
+
+    // Add data and delete files with new spec (id)
+    DataFile dataFile10 = dataFile(tab, 10);
+    DataFile dataFile99 = dataFile(tab, 99);
+    tab.newAppend().appendFile(dataFile10).appendFile(dataFile99).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes10 = deleteFile(tab, dataFile10, 10);
+    Pair<List<PositionDelete<?>>, DeleteFile> deletes99 = deleteFile(tab, dataFile10, 99);
+    tab.newRowDelta().addDeletes(deletes10.second()).addDeletes(deletes99.second()).commit();
+
+    // Query partition of old spec
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    StructLikeSet expectedA =
+        expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().path().toString());
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Query partition of new spec
+    Record partition10 = partitionRecordTemplate.copy("id", 10);
+    StructLikeSet expected10 =
+        expected(
+            tab,
+            deletes10.first(),
+            partition10,
+            tab.spec().specId(),
+            deletes10.second().path().toString());
+    StructLikeSet actual10 = actual(tableName, tab, "partition.id = 10 AND pos >= 0");
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expected10, actual10);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionEvolutionAdd() throws Exception {
+    // Create unpartitioned table
+    String tableName = "partition_evolution_add";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+    int specId0 = tab.spec().specId();
+
+    // Add files with unpartitioned spec
+    DataFile dataFileUnpartitioned = dataFile(tab);
+    tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+        deleteFile(tab, dataFileUnpartitioned);
+    tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+    // Switch partition spec to (data)
+    tab.updateSpec().addField("data").commit();
+    int specId1 = tab.spec().specId();
+
+    // Add files with new spec (data)
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Select deletes from new spec (data)
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    StructLikeSet expectedA =
+        expected(tab, deletesA.first(), partitionA, specId1, deletesA.second().path().toString());
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Select deletes from 'unpartitioned' data
+    Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+    StructLikeSet expectedUnpartitioned =
+        expected(
+            tab,
+            deletesUnpartitioned.first(),
+            unpartitionedRecord,
+            specId0,
+            deletesUnpartitioned.second().path().toString());
+    StructLikeSet actualUnpartitioned =
+        actual(tableName, tab, "partition.data IS NULL and pos >= 0");
+
+    Assert.assertEquals(
+        "Position Delete table should contain expected rows",
+        expectedUnpartitioned,
+        actualUnpartitioned);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testPartitionEvolutionRemove() throws Exception {
+    // Create table with spec (data)
+    String tableName = "partition_evolution_remove";
+    PartitionSpec originalSpec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, originalSpec);
+    int specId0 = tab.spec().specId();
+
+    // Add files with spec (data)
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Remove partition field
+    tab.updateSpec().removeField("data").commit();
+    int specId1 = tab.spec().specId();
+
+    // Add unpartitioned files
+    DataFile dataFileUnpartitioned = dataFile(tab);
+    tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+        deleteFile(tab, dataFileUnpartitioned);
+    tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+    // Select deletes from (data) spec
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    StructLikeSet expectedA =
+        expected(tab, deletesA.first(), partitionA, specId0, deletesA.second().path().toString());
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Select deletes from 'unpartitioned' spec
+    Record unpartitionedRecord = partitionRecordTemplate.copy("data", null);
+    StructLikeSet expectedUnpartitioned =
+        expected(
+            tab,
+            deletesUnpartitioned.first(),
+            unpartitionedRecord,
+            specId1,
+            deletesUnpartitioned.second().path().toString());
+    StructLikeSet actualUnpartitioned =
+        actual(tableName, tab, "partition.data IS NULL and pos >= 0");
+
+    Assert.assertEquals(
+        "Position Delete table should contain expected rows",
+        expectedUnpartitioned,
+        actualUnpartitioned);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testSpecIdFilter() throws Exception {
+    // Create table with spec (data)
+    String tableName = "spec_id_filter";
+    Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned());
+    int unpartitionedSpec = tab.spec().specId();
+
+    // Add data file and delete
+    DataFile dataFileUnpartitioned = dataFile(tab);
+    tab.newAppend().appendFile(dataFileUnpartitioned).commit();
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesUnpartitioned =
+        deleteFile(tab, dataFileUnpartitioned);
+    tab.newRowDelta().addDeletes(deletesUnpartitioned.second()).commit();
+
+    // Switch partition spec to (data) and add files
+    tab.updateSpec().addField("data").commit();
+    int dataSpec = tab.spec().specId();
+
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Select deletes from 'unpartitioned'
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    StructLikeSet expectedUnpartitioned =
+        expected(
+            tab,
+            deletesUnpartitioned.first(),
+            partitionRecordTemplate,
+            unpartitionedSpec,
+            deletesUnpartitioned.second().path().toString());
+    StructLikeSet actualUnpartitioned =
+        actual(tableName, tab, String.format("spec_id = %d", unpartitionedSpec));
+    Assert.assertEquals(
+        "Position Delete table should contain expected rows",
+        expectedUnpartitioned,
+        actualUnpartitioned);
+
+    // Select deletes from 'data' partition spec
+    StructLike partitionA = partitionRecordTemplate.copy("data", "a");
+    StructLike partitionB = partitionRecordTemplate.copy("data", "b");
+    StructLikeSet expected =
+        expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().path().toString());
+    expected.addAll(
+        expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second().path().toString()));
+
+    StructLikeSet actual = actual(tableName, tab, String.format("spec_id = %d", dataSpec));
+    Assert.assertEquals("Position Delete table should contain expected rows", expected, actual);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testSchemaEvolutionAdd() throws Exception {
+    // Create table with original schema
+    String tableName = "schema_evolution_add";
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table tab = createTable(tableName, SCHEMA, spec);
+
+    // Add files with original schema
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Add files with new schema
+    tab.updateSchema()
+        .addColumn("new_col_1", Types.IntegerType.get())
+        .addColumn("new_col_2", Types.IntegerType.get())
+        .commit();
+
+    // Add files with new schema
+    DataFile dataFileC = dataFile(tab, "c");
+    DataFile dataFileD = dataFile(tab, "d");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+    tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+    // Select deletes from old schema
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    // pad expected delete rows with null values for new columns
+    List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+    expectedDeletesA.forEach(
+        d -> {
+          GenericRecord nested = d.get(2, GenericRecord.class);
+          GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+          padded.set(0, nested.get(0));
+          padded.set(1, nested.get(1));
+          padded.set(2, null);
+          padded.set(3, null);
+          d.set(2, padded);
+        });
+    StructLikeSet expectedA =
+        expected(tab, expectedDeletesA, partitionA, deletesA.second().path().toString());
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Select deletes from new schema
+    Record partitionC = partitionRecordTemplate.copy("data", "c");
+    StructLikeSet expectedC =
+        expected(tab, deletesC.first(), partitionC, deletesC.second().path().toString());
+    StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0");
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+    dropTable(tableName);
+  }
+
+  @Test
+  public void testSchemaEvolutionRemove() throws Exception {
+    // Create table with original schema
+    String tableName = "schema_evolution_remove";
+    Schema oldSchema =
+        new Schema(
+            Types.NestedField.required(1, "id", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data", Types.StringType.get()),
+            Types.NestedField.optional(3, "new_col_1", Types.IntegerType.get()),
+            Types.NestedField.optional(4, "new_col_2", Types.IntegerType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(oldSchema).identity("data").build();
+    Table tab = createTable(tableName, oldSchema, spec);
+
+    // Add files with original schema
+    DataFile dataFileA = dataFile(tab, "a");
+    DataFile dataFileB = dataFile(tab, "b");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesA = deleteFile(tab, dataFileA, "a");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesB = deleteFile(tab, dataFileB, "b");
+    tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit();
+
+    // Add files with new schema
+    tab.updateSchema().deleteColumn("new_col_1").deleteColumn("new_col_2").commit();
+
+    // Add files with new schema
+    DataFile dataFileC = dataFile(tab, "c");
+    DataFile dataFileD = dataFile(tab, "d");
+    tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit();
+
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesC = deleteFile(tab, dataFileC, "c");
+    Pair<List<PositionDelete<?>>, DeleteFile> deletesD = deleteFile(tab, dataFileD, "d");
+    tab.newRowDelta().addDeletes(deletesC.second()).addDeletes(deletesD.second()).commit();
+
+    // Select deletes from old schema
+    GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab));
+    Record partitionA = partitionRecordTemplate.copy("data", "a");
+    // remove deleted columns from expected result
+    List<PositionDelete<?>> expectedDeletesA = deletesA.first();
+    expectedDeletesA.forEach(
+        d -> {
+          GenericRecord nested = d.get(2, GenericRecord.class);
+          GenericRecord padded = GenericRecord.create(tab.schema().asStruct());
+          padded.set(0, nested.get(0));
+          padded.set(1, nested.get(1));
+          d.set(2, padded);
+        });
+    StructLikeSet expectedA =
+        expected(tab, expectedDeletesA, partitionA, deletesA.second().path().toString());
+    StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0");
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedA, actualA);
+
+    // Select deletes from new schema
+    Record partitionC = partitionRecordTemplate.copy("data", "c");
+    StructLikeSet expectedC =
+        expected(tab, deletesC.first(), partitionC, deletesC.second().path().toString());
+    StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0");
+
+    Assert.assertEquals("Position Delete table should contain expected rows", expectedC, actualC);
+    dropTable(tableName);
+  }
+
+  private StructLikeSet actual(String tableName, Table table) {
+    return actual(tableName, table, null);
+  }
+
+  private StructLikeSet actual(String tableName, Table table, String filter) {
+    Dataset<Row> df =
+        spark.read().format("iceberg").load("default." + tableName + ".position_deletes");
+    if (filter != null) {
+      df = df.filter(filter);
+    }
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
+    Types.StructType projection = deletesTable.schema().asStruct();
+    StructLikeSet set = StructLikeSet.create(projection);
+    df.collectAsList()
+        .forEach(
+            row -> {
+              SparkStructLike rowWrapper = new SparkStructLike(projection);
+              set.add(rowWrapper.wrap(row));
+            });
+
+    return set;
+  }
+
+  protected Table createTable(String name, Schema schema, PartitionSpec spec) {
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.FORMAT_VERSION,
+            "2",
+            TableProperties.DEFAULT_FILE_FORMAT,
+            format.toString());
+    return catalog.createTable(TableIdentifier.of("default", name), schema, spec, properties);
+  }
+
+  protected void dropTable(String name) {
+    catalog.dropTable(TableIdentifier.of("default", name));
+  }
+
+  private PositionDelete<GenericRecord> positionDelete(CharSequence path, Long position) {
+    PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+    posDelete.set(path, position, null);
+    return posDelete;
+  }
+
+  private PositionDelete<GenericRecord> positionDelete(
+      Schema tableSchema, CharSequence path, Long position, Object... values) {
+    PositionDelete<GenericRecord> posDelete = PositionDelete.create();
+    GenericRecord nested = GenericRecord.create(tableSchema);
+    for (int i = 0; i < values.length; i++) {
+      nested.set(i, values[i]);
+    }
+    posDelete.set(path, position, nested);
+    return posDelete;
+  }
+
+  private StructLikeSet expected(
+      Table testTable,
+      List<PositionDelete<?>> deletes,
+      StructLike partitionStruct,
+      int specId,
+      String deleteFilePath) {
+    Table deletesTable =
+        MetadataTableUtils.createMetadataTableInstance(
+            testTable, MetadataTableType.POSITION_DELETES);
+    Types.StructType posDeleteSchema = deletesTable.schema().asStruct();
+    final Types.StructType finalSchema = posDeleteSchema;
+    StructLikeSet set = StructLikeSet.create(posDeleteSchema);
+    deletes.stream()
+        .map(
+            p -> {
+              GenericRecord record = GenericRecord.create(finalSchema);
+              record.setField("file_path", p.path());
+              record.setField("pos", p.pos());
+              record.setField("row", p.row());
+              if (partitionStruct != null) {
+                record.setField("partition", partitionStruct);
+              }
+              record.setField("spec_id", specId);
+              record.setField("delete_file_path", deleteFilePath);
+              return record;
+            })
+        .forEach(set::add);
+    return set;
+  }
+
+  private StructLikeSet expected(
+      Table testTable,
+      List<PositionDelete<?>> deletes,
+      StructLike partitionStruct,
+      String deleteFilePath) {
+    return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFilePath);
+  }
+
+  private DataFile dataFile(Table tab, Object... partValues) throws IOException {
+    return dataFile(tab, partValues, partValues);
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private DataFile dataFile(Table tab, Object[] partDataValues, Object[] partFieldValues)
+      throws IOException {
+    GenericRecord record = GenericRecord.create(tab.schema());
+    List<String> partitionFieldNames =
+        tab.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
+    int idIndex = partitionFieldNames.indexOf("id");
+    int dataIndex = partitionFieldNames.indexOf("data");
+    Integer idPartition = idIndex != -1 ? (Integer) partDataValues[idIndex] : null;
+    String dataPartition = dataIndex != -1 ? (String) partDataValues[dataIndex] : null;
+
+    // fill columns with partition source fields, or preset values
+    List<Record> records =
+        Lists.newArrayList(
+            record.copy(
+                "id",
+                idPartition != null ? idPartition : 29,
+                "data",
+                dataPartition != null ? dataPartition : "c"),
+            record.copy(
+                "id",
+                idPartition != null ? idPartition : 43,
+                "data",
+                dataPartition != null ? dataPartition : "k"),
+            record.copy(
+                "id",
+                idPartition != null ? idPartition : 61,
+                "data",
+                dataPartition != null ? dataPartition : "r"),
+            record.copy(
+                "id",
+                idPartition != null ? idPartition : 89,
+                "data",
+                dataPartition != null ? dataPartition : "t"));
+
+    // fill remaining columns with incremental values
+    List<Types.NestedField> cols = tab.schema().columns();
+    if (cols.size() > 2) {
+      for (int i = 2; i < cols.size(); i++) {
+        final int pos = i;
+        records.forEach(r -> r.set(pos, pos));
+      }
+    }
+
+    TestHelpers.Row partitionInfo = TestHelpers.Row.of(partFieldValues);
+    return FileHelpers.writeDataFile(
+        tab, Files.localOutput(temp.newFile()), partitionInfo, records);
+  }
+
+  private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(
+      Table tab, DataFile dataFile, Object... partValues) throws IOException {
+    return deleteFile(tab, dataFile, partValues, partValues);
+  }
+
+  private Pair<List<PositionDelete<?>>, DeleteFile> deleteFile(
+      Table tab, DataFile dataFile, Object[] partDataValues, Object[] partFieldValues)
+      throws IOException {
+    List<PartitionField> partFields = tab.spec().fields();
+    List<String> partitionFieldNames =
+        partFields.stream().map(PartitionField::name).collect(Collectors.toList());
+    int idIndex = partitionFieldNames.indexOf("id");
+    int dataIndex = partitionFieldNames.indexOf("data");
+    Integer idPartition = idIndex != -1 ? (Integer) partDataValues[idIndex] : null;
+    String dataPartition = dataIndex != -1 ? (String) partDataValues[dataIndex] : null;
+
+    // fill columns with partition source fields, or preset values
+    List<PositionDelete<?>> deletes =
+        Lists.newArrayList(
+            positionDelete(
+                tab.schema(),
+                dataFile.path(),
+                0L,
+                idPartition != null ? idPartition : 29,
+                dataPartition != null ? dataPartition : "c"),
+            positionDelete(
+                tab.schema(),
+                dataFile.path(),
+                0L,
+                idPartition != null ? idPartition : 61,
+                dataPartition != null ? dataPartition : "r"));
+
+    // fill remaining columns with incremental values
+    List<Types.NestedField> cols = tab.schema().columns();
+    if (cols.size() > 2) {
+      for (int i = 2; i < cols.size(); i++) {
+        final int pos = i;
+        deletes.forEach(d -> d.get(2, GenericRecord.class).set(pos, pos));
+      }
+    }
+
+    TestHelpers.Row partitionInfo = TestHelpers.Row.of(partFieldValues);
+
+    DeleteFile deleteFile =
+        FileHelpers.writePosDeleteFile(
+            tab, Files.localOutput(temp.newFile()), partitionInfo, deletes);
+    return Pair.of(deletes, deleteFile);
+  }
+}