You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/03/11 23:21:51 UTC

[iceberg] branch master updated: Core: Add delete_files metadata table (#4243)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1e634  Core: Add delete_files metadata table (#4243)
ad1e634 is described below

commit ad1e634a4806de2d420d18e7c0e82b74409cfe69
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Fri Mar 11 15:21:40 2022 -0800

    Core: Add delete_files metadata table (#4243)
---
 .../java/org/apache/iceberg/AllDataFilesTable.java |   4 +-
 .../{DataFilesTable.java => BaseFilesTable.java}   | 107 ++++++-----
 .../java/org/apache/iceberg/DataFilesTable.java    | 116 ++----------
 .../java/org/apache/iceberg/DeleteFilesTable.java  |  67 +++++++
 .../java/org/apache/iceberg/MetadataTableType.java |   1 +
 .../org/apache/iceberg/MetadataTableUtils.java     |   2 +
 .../java/org/apache/iceberg/TableTestBase.java     |  32 ++--
 .../org/apache/iceberg/TestMetadataTableScans.java | 203 ++++++++++++++++++++-
 .../apache/iceberg/hadoop/HadoopTableTestBase.java |   9 +
 .../iceberg/hadoop/TestTableSerialization.java     |   8 +
 .../spark/extensions/TestMetadataTables.java       | 177 ++++++++++++++++++
 11 files changed, 545 insertions(+), 181 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index d5db8cc..4eca0b4 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.BaseFilesTable.ManifestReadTask;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -118,7 +119,8 @@ public class AllDataFilesTable extends BaseMetadataTable {
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       return CloseableIterable.transform(manifests, manifest ->
-          new DataFilesTable.ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
+          new ManifestReadTask(ops.io(), ops.current().specsById(), manifest, schema(),
+              schemaString, specString, residuals));
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
similarity index 61%
copy from core/src/main/java/org/apache/iceberg/DataFilesTable.java
copy to core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 31c0bea..a0b835a4 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -33,24 +35,15 @@ import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types.StructType;
 
 /**
- * A {@link Table} implementation that exposes a table's data files as rows.
+ * Base class logic for files metadata tables
  */
-public class DataFilesTable extends BaseMetadataTable {
+abstract class BaseFilesTable extends BaseMetadataTable {
 
-  DataFilesTable(TableOperations ops, Table table) {
-    this(ops, table, table.name() + ".files");
-  }
-
-  DataFilesTable(TableOperations ops, Table table, String name) {
+  BaseFilesTable(TableOperations ops, Table table, String name) {
     super(ops, table, name);
   }
 
   @Override
-  public TableScan newScan() {
-    return new FilesTableScan(operations(), table(), schema());
-  }
-
-  @Override
   public Schema schema() {
     StructType partitionType = Partitioning.partitionType(table());
     Schema schema = new Schema(DataFile.getType(partitionType).fields());
@@ -62,90 +55,108 @@ public class DataFilesTable extends BaseMetadataTable {
     }
   }
 
-  @Override
-  MetadataTableType metadataTableType() {
-    return MetadataTableType.FILES;
-  }
-
-  public static class FilesTableScan extends BaseMetadataTableScan {
+  abstract static class BaseFilesTableScan extends BaseMetadataTableScan {
     private final Schema fileSchema;
+    private final MetadataTableType type;
 
-    FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+    protected BaseFilesTableScan(TableOperations ops, Table table, Schema fileSchema, MetadataTableType type) {
       super(ops, table, fileSchema);
       this.fileSchema = fileSchema;
+      this.type = type;
     }
 
-    private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
-                           TableScanContext context) {
+    protected BaseFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+                                 TableScanContext context, MetadataTableType type) {
       super(ops, table, schema, context);
       this.fileSchema = fileSchema;
+      this.type = type;
+    }
+
+    protected Schema fileSchema() {
+      return fileSchema;
     }
 
     @Override
     public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
       throw new UnsupportedOperationException(
-          String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+          String.format("Cannot incrementally scan table of type %s", type.name()));
     }
 
     @Override
     public TableScan appendsAfter(long fromSnapshotId) {
       throw new UnsupportedOperationException(
-          String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+          String.format("Cannot incrementally scan table of type %s", type.name()));
     }
 
     @Override
-    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
-      return new FilesTableScan(ops, table, schema, fileSchema, context);
-    }
-
-    @Override
-    protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+    protected CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter,
         boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
-      CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
+      CloseableIterable<ManifestFile> filtered = filterManifests(manifests(), rowFilter, caseSensitive);
+
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
-      Expression partitionFilter = Projections
-          .inclusive(
-              transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
-              caseSensitive)
-          .project(rowFilter);
-
-      ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
-          partitionFilter, table().spec(), caseSensitive);
-      CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
-
       // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       // This data task needs to use the table schema, which may not include a partition schema to avoid having an
       // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
       // all cases.
       return CloseableIterable.transform(filtered, manifest ->
-          new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
+          new ManifestReadTask(ops.io(), ops.current().specsById(),
+              manifest, schema(), schemaString, specString, residuals));
+    }
+
+    /**
+     * @return list of manifest files to explore for this files metadata table scan
+     */
+    protected abstract List<ManifestFile> manifests();
+
+    private CloseableIterable<ManifestFile> filterManifests(List<ManifestFile> manifests,
+                                                            Expression rowFilter,
+                                                            boolean caseSensitive) {
+      CloseableIterable<ManifestFile> manifestIterable = CloseableIterable.withNoopClose(manifests);
+
+      // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
+      PartitionSpec spec = transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX);
+      Expression partitionFilter = Projections.inclusive(spec, caseSensitive).project(rowFilter);
+
+      ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
+          partitionFilter, table().spec(), caseSensitive);
+
+      return CloseableIterable.filter(manifestIterable, manifestEval::eval);
     }
   }
 
   static class ManifestReadTask extends BaseFileScanTask implements DataTask {
     private final FileIO io;
+    private final Map<Integer, PartitionSpec> specsById;
     private final ManifestFile manifest;
     private final Schema schema;
 
-    ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
-                     String specString, ResidualEvaluator residuals) {
+    ManifestReadTask(FileIO io, Map<Integer, PartitionSpec> specsById, ManifestFile manifest,
+                     Schema schema, String schemaString, String specString, ResidualEvaluator residuals) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
       this.io = io;
+      this.specsById = specsById;
       this.manifest = manifest;
       this.schema = schema;
     }
 
     @Override
     public CloseableIterable<StructLike> rows() {
-      return CloseableIterable.transform(
-          ManifestFiles.read(manifest, io).project(schema),
-          file -> (GenericDataFile) file);
+      return CloseableIterable.transform(manifestEntries(), file -> (StructLike) file);
+    }
+
+    private CloseableIterable<? extends ContentFile<?>> manifestEntries() {
+      switch (manifest.content()) {
+        case DATA:
+          return ManifestFiles.read(manifest, io, specsById).project(schema);
+        case DELETES:
+          return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(schema);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
+      }
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 31c0bea..cc35205 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,23 +19,12 @@
 
 package org.apache.iceberg;
 
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types.StructType;
+import java.util.List;
 
 /**
  * A {@link Table} implementation that exposes a table's data files as rows.
  */
-public class DataFilesTable extends BaseMetadataTable {
+public class DataFilesTable extends BaseFilesTable {
 
   DataFilesTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".files");
@@ -47,19 +36,7 @@ public class DataFilesTable extends BaseMetadataTable {
 
   @Override
   public TableScan newScan() {
-    return new FilesTableScan(operations(), table(), schema());
-  }
-
-  @Override
-  public Schema schema() {
-    StructType partitionType = Partitioning.partitionType(table());
-    Schema schema = new Schema(DataFile.getType(partitionType).fields());
-    if (partitionType.fields().size() < 1) {
-      // avoid returning an empty struct, which is not always supported. instead, drop the partition field
-      return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
-    } else {
-      return schema;
-    }
+    return new DataFilesTableScan(operations(), table(), schema());
   }
 
   @Override
@@ -67,95 +44,24 @@ public class DataFilesTable extends BaseMetadataTable {
     return MetadataTableType.FILES;
   }
 
-  public static class FilesTableScan extends BaseMetadataTableScan {
-    private final Schema fileSchema;
+  public static class DataFilesTableScan extends BaseFilesTableScan {
 
-    FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
-      super(ops, table, fileSchema);
-      this.fileSchema = fileSchema;
+    DataFilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+      super(ops, table, fileSchema, MetadataTableType.FILES);
     }
 
-    private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
-                           TableScanContext context) {
-      super(ops, table, schema, context);
-      this.fileSchema = fileSchema;
-    }
-
-    @Override
-    public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
-      throw new UnsupportedOperationException(
-          String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
-    }
-
-    @Override
-    public TableScan appendsAfter(long fromSnapshotId) {
-      throw new UnsupportedOperationException(
-          String.format("Cannot incrementally scan table of type %s", MetadataTableType.FILES.name()));
+    DataFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema, TableScanContext context) {
+      super(ops, table, schema, fileSchema, context, MetadataTableType.FILES);
     }
 
     @Override
     protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
-      return new FilesTableScan(ops, table, schema, fileSchema, context);
+      return new DataFilesTableScan(ops, table, schema, fileSchema(), context);
     }
 
     @Override
-    protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
-      CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
-      String schemaString = SchemaParser.toJson(schema());
-      String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
-
-      // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
-      Expression partitionFilter = Projections
-          .inclusive(
-              transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
-              caseSensitive)
-          .project(rowFilter);
-
-      ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
-          partitionFilter, table().spec(), caseSensitive);
-      CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
-
-      // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
-      // This data task needs to use the table schema, which may not include a partition schema to avoid having an
-      // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
-      // all cases.
-      return CloseableIterable.transform(filtered, manifest ->
-          new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
-    }
-  }
-
-  static class ManifestReadTask extends BaseFileScanTask implements DataTask {
-    private final FileIO io;
-    private final ManifestFile manifest;
-    private final Schema schema;
-
-    ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
-                     String specString, ResidualEvaluator residuals) {
-      super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
-      this.io = io;
-      this.manifest = manifest;
-      this.schema = schema;
-    }
-
-    @Override
-    public CloseableIterable<StructLike> rows() {
-      return CloseableIterable.transform(
-          ManifestFiles.read(manifest, io).project(schema),
-          file -> (GenericDataFile) file);
-    }
-
-    @Override
-    public Iterable<FileScanTask> split(long splitSize) {
-      return ImmutableList.of(this); // don't split
-    }
-
-    @VisibleForTesting
-    ManifestFile manifest() {
-      return manifest;
+    protected List<ManifestFile> manifests() {
+      return snapshot().dataManifests();
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java
new file mode 100644
index 0000000..201c76d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+
+/**
+ * A {@link Table} implementation that exposes a table's delete files as rows.
+ */
+public class DeleteFilesTable extends BaseFilesTable {
+
+  DeleteFilesTable(TableOperations ops, Table table) {
+    this(ops, table, table.name() + ".delete_files");
+  }
+
+  DeleteFilesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new DeleteFilesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.DELETE_FILES;
+  }
+
+  public static class DeleteFilesTableScan extends BaseFilesTableScan {
+
+    DeleteFilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+      super(ops, table, fileSchema, MetadataTableType.DELETE_FILES);
+    }
+
+    DeleteFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema, TableScanContext context) {
+      super(ops, table, schema, fileSchema, context, MetadataTableType.DELETE_FILES);
+    }
+
+    @Override
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      return new DeleteFilesTableScan(ops, table, schema, fileSchema(), context);
+    }
+
+    @Override
+    protected List<ManifestFile> manifests() {
+      return snapshot().deleteManifests();
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
index 93e53b9..b173c0b 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
@@ -24,6 +24,7 @@ import java.util.Locale;
 public enum MetadataTableType {
   ENTRIES,
   FILES,
+  DELETE_FILES,
   HISTORY,
   SNAPSHOTS,
   MANIFESTS,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
index afb1619..52a429c 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
@@ -55,6 +55,8 @@ public class MetadataTableUtils {
         return new ManifestEntriesTable(ops, baseTable, metadataTableName);
       case FILES:
         return new DataFilesTable(ops, baseTable, metadataTableName);
+      case DELETE_FILES:
+        return new DeleteFilesTable(ops, baseTable, metadataTableName);
       case HISTORY:
         return new HistoryTable(ops, baseTable, metadataTableName);
       case SNAPSHOTS:
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 82e6076..5abfb2c 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -107,34 +107,24 @@ public class TableTestBase {
       .withPartitionPath("data_bucket=2") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
-  static final DataFile FILE_D = DataFiles.builder(SPEC)
-      .withPath("/path/to/data-d.parquet")
-      .withFileSizeInBytes(10)
-      .withPartitionPath("data_bucket=3") // easy way to set partition data for now
-      .withRecordCount(1)
-      .build();
-  static final DataFile FILE_PARTITION_0 = DataFiles.builder(SPEC)
-      .withPath("/path/to/data-0.parquet")
-      .withFileSizeInBytes(10)
-      .withPartition(TestHelpers.Row.of(0))
-      .withRecordCount(1)
-      .build();
-  static final DataFile FILE_PARTITION_1 = DataFiles.builder(SPEC)
-      .withPath("/path/to/data-1.parquet")
+  static final DeleteFile FILE_C2_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+      .ofEqualityDeletes(1)
+      .withPath("/path/to/data-c-deletes.parquet")
       .withFileSizeInBytes(10)
-      .withPartition(TestHelpers.Row.of(1))
+      .withPartitionPath("data_bucket=2") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
-  static final DataFile FILE_PARTITION_2 = DataFiles.builder(SPEC)
-      .withPath("/path/to/data-2.parquet")
+  static final DataFile FILE_D = DataFiles.builder(SPEC)
+      .withPath("/path/to/data-d.parquet")
       .withFileSizeInBytes(10)
-      .withPartition(TestHelpers.Row.of(2))
+      .withPartitionPath("data_bucket=3") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
-  static final DataFile FILE_PARTITION_3 = DataFiles.builder(SPEC)
-      .withPath("/path/to/data-3.parquet")
+  static final DeleteFile FILE_D2_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+      .ofEqualityDeletes(1)
+      .withPath("/path/to/data-d-deletes.parquet")
       .withFileSizeInBytes(10)
-      .withPartition(TestHelpers.Row.of(3))
+      .withPartitionPath("data_bucket=3") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
   static final DataFile FILE_WITH_STATS = DataFiles.builder(SPEC)
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index 1bb8f28..40f0989 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.StreamSupport;
+import org.apache.iceberg.BaseFilesTable.ManifestReadTask;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
@@ -32,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -53,17 +55,32 @@ public class TestMetadataTableScans extends TableTestBase {
 
   private void preparePartitionedTable() {
     table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
+        .appendFile(FILE_A)
         .commit();
     table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
+        .appendFile(FILE_C)
         .commit();
     table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
+        .appendFile(FILE_D)
         .commit();
     table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
+        .appendFile(FILE_B)
         .commit();
+
+    if (formatVersion == 2) {
+      table.newRowDelta()
+          .addDeletes(FILE_A_DELETES)
+          .commit();
+      table.newRowDelta()
+          .addDeletes(FILE_B_DELETES)
+          .commit();
+      table.newRowDelta()
+          .addDeletes(FILE_C2_DELETES)
+          .commit();
+      table.newRowDelta()
+          .addDeletes(FILE_D2_DELETES)
+          .commit();
+    }
   }
 
   @Test
@@ -463,6 +480,180 @@ public class TestMetadataTableScans extends TableTestBase {
   }
 
   @Test
+  public void testDeleteFilesTableScanNoFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition spec")).asStruct();
+
+    TableScan scanNoFilter = deleteFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<FileScanTask> tasks = scanNoFilter.planFiles();
+
+    Assert.assertEquals(4, Iterables.size(tasks));
+    validateFileScanTasks(tasks, 0);
+    validateFileScanTasks(tasks, 1);
+    validateFileScanTasks(tasks, 2);
+    validateFileScanTasks(tasks, 3);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanAndFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = deleteFilesTable.newScan().filter(andEquals);
+    CloseableIterable<FileScanTask> tasks = scan.planFiles();
+    Assert.assertEquals(1, Iterables.size(tasks));
+    validateFileScanTasks(tasks, 0);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanAndFilterWithPlanTasks() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = deleteFilesTable.newScan().filter(andEquals);
+    CloseableIterable<CombinedScanTask> tasks = scan.planTasks();
+    Assert.assertEquals(1, Iterables.size(tasks));
+    validateCombinedScanTasks(tasks, 0);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanLtFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    Expression lt = Expressions.lessThan("partition.data_bucket", 2);
+    TableScan scan = deleteFilesTable.newScan().filter(lt);
+    CloseableIterable<FileScanTask> tasksLt = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksLt));
+    validateFileScanTasks(tasksLt, 0);
+    validateFileScanTasks(tasksLt, 1);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanOrFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = deleteFilesTable.newScan()
+        .filter(or);
+    CloseableIterable<FileScanTask> tasksOr = scan.planFiles();
+    Assert.assertEquals(4, Iterables.size(tasksOr));
+    validateFileScanTasks(tasksOr, 0);
+    validateFileScanTasks(tasksOr, 1);
+    validateFileScanTasks(tasksOr, 2);
+    validateFileScanTasks(tasksOr, 3);
+  }
+
+  @Test
+  public void testDeleteFilesScanNotFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+    Table deleteFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scan = deleteFilesTable.newScan()
+        .filter(not);
+    CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksNot));
+    validateFileScanTasks(tasksNot, 2);
+    validateFileScanTasks(tasksNot, 3);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanInFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    Expression set = Expressions.in("partition.data_bucket", 2, 3);
+    TableScan scan = deleteFilesTable.newScan()
+        .filter(set);
+    CloseableIterable<FileScanTask> tasksIn = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksIn));
+
+    validateFileScanTasks(tasksIn, 2);
+    validateFileScanTasks(tasksIn, 3);
+  }
+
+  @Test
+  public void testDeleteFilesTableScanNotNullFilter() {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    preparePartitionedTable();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+    Expression notNull = Expressions.notNull("partition.data_bucket");
+    TableScan scan = deleteFilesTable.newScan()
+        .filter(notNull);
+    CloseableIterable<FileScanTask> tasksNotNull = scan.planFiles();
+    Assert.assertEquals(4, Iterables.size(tasksNotNull));
+
+    validateFileScanTasks(tasksNotNull, 0);
+    validateFileScanTasks(tasksNotNull, 1);
+    validateFileScanTasks(tasksNotNull, 2);
+    validateFileScanTasks(tasksNotNull, 3);
+  }
+
+  @Test
+  public void testDeleteFilesTableSelection() throws IOException {
+    Assume.assumeTrue("Only V2 Tables Support Deletes", formatVersion >= 2);
+
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newRowDelta()
+        .addDeletes(FILE_A_DELETES)
+        .addDeletes(FILE_A2_DELETES)
+        .commit();
+
+    Table deleteFilesTable = new DeleteFilesTable(table.ops(), table);
+
+    TableScan scan = deleteFilesTable.newScan()
+        .filter(Expressions.equal("record_count", 1))
+        .select("content", "record_count");
+    validateTaskScanResiduals(scan, false);
+    Types.StructType expected = new Schema(
+        optional(134, "content", Types.IntegerType.get(),
+            "Contents of the file: 0=data, 1=position deletes, 2=equality deletes"),
+        required(103, "record_count", Types.LongType.get(), "Number of records in the file")
+    ).asStruct();
+    Assert.assertEquals(expected, scan.schema().asStruct());
+  }
+
+  @Test
   public void testPartitionColumnNamedPartition() throws Exception {
     TestTables.clearTables();
     this.tableDir = temp.newFolder();
@@ -601,14 +792,14 @@ public class TestMetadataTableScans extends TableTestBase {
   private void validateFileScanTasks(CloseableIterable<FileScanTask> fileScanTasks, int partValue) {
     Assert.assertTrue("File scan tasks do not include correct file",
         StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> {
-          ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest();
+          ManifestFile mf = ((ManifestReadTask) t).manifest();
           return manifestHasPartition(mf, partValue);
         }));
   }
 
   private void validateCombinedScanTasks(CloseableIterable<CombinedScanTask> tasks, int partValue) {
     StreamSupport.stream(tasks.spliterator(), false)
-        .flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest()))
+        .flatMap(c -> c.files().stream().map(t -> ((ManifestReadTask) t).manifest()))
         .anyMatch(m -> manifestHasPartition(m, partValue));
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 0c21148..29643ab 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -87,6 +89,13 @@ public class HadoopTableTestBase {
       .withPartitionPath("data_bucket=1") // easy way to set partition data for now
       .withRecordCount(2) // needs at least one record or else metrics will filter it out
       .build();
+  static final DeleteFile FILE_B_DELETES = FileMetadata.deleteFileBuilder(SPEC)
+      .ofPositionDeletes()
+      .withPath("/path/to/data-b-deletes.parquet")
+      .withFileSizeInBytes(0)
+      .withPartitionPath("data_bucket=1")
+      .withRecordCount(1)
+      .build();
   static final DataFile FILE_C = DataFiles.builder(SPEC)
       .withPath("/path/to/data-a.parquet")
       .withFileSizeInBytes(0)
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
index 1086ba0..4159ce6 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.io.CloseableIterable;
@@ -120,6 +121,10 @@ public class TestTableSerialization extends HadoopTableTestBase {
 
   @Test
   public void testSerializableMetadataTablesPlanning() throws IOException {
+    table.updateProperties()
+        .set(TableProperties.FORMAT_VERSION, "2")
+        .commit();
+
     table.newAppend()
         .appendFile(FILE_A)
         .commit();
@@ -138,6 +143,9 @@ public class TestTableSerialization extends HadoopTableTestBase {
     table.newAppend()
         .appendFile(FILE_B)
         .commit();
+    table.newRowDelta()
+        .addDeletes(FILE_B_DELETES)
+        .commit();
 
     for (MetadataTableType type : MetadataTableType.values()) {
       // Collect the deserialized data
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
new file mode 100644
index 0000000..7a8bd41
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestMetadataTables extends SparkExtensionsTestBase {
+
+  public TestMetadataTables(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDeleteFilesTable() throws Exception {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES" +
+        "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "d")
+    );
+    spark.createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    List<ManifestFile> expectedManifests = TestHelpers.deleteManifests(table);
+    Assert.assertEquals("Should have 1 delete manifest", 1, expectedManifests.size());
+
+    Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema();
+    Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".delete_files").schema();
+
+    List<Row> actual = spark.sql("SELECT * FROM " + tableName + ".delete_files").collectAsList();
+
+    List<Record> expected = expectedEntries(table, entriesTableSchema, expectedManifests, null);
+
+    Assert.assertEquals("Should be one delete file manifest entry", 1, expected.size());
+    Assert.assertEquals("Metadata table should return one delete file", 1, actual.size());
+
+    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expected.get(0), actual.get(0));
+  }
+
+  @Test
+  public void testDeleteFilesTablePartitioned() throws Exception {
+    sql("CREATE TABLE %s (id bigint, data string) " +
+        "USING iceberg " +
+        "PARTITIONED BY (data) " +
+        "TBLPROPERTIES" +
+        "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+    List<SimpleRecord> recordsA = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "a")
+    );
+    spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    List<SimpleRecord> recordsB = Lists.newArrayList(
+        new SimpleRecord(1, "b"),
+        new SimpleRecord(2, "b")
+        );
+    spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    sql("DELETE FROM %s WHERE id=1 AND data='a'", tableName);
+    sql("DELETE FROM %s WHERE id=1 AND data='b'", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    List<ManifestFile> expectedManifests = TestHelpers.deleteManifests(table);
+    Assert.assertEquals("Should have 2 delete files", 2, expectedManifests.size());
+
+    List<Row> actual = spark.sql("SELECT * FROM " + tableName + ".delete_files " +
+        "WHERE partition.data='a'").collectAsList();
+
+    Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".entries").schema();
+    Schema filesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + ".delete_files").schema();
+
+    List<Record> expected = expectedEntries(table, entriesTableSchema, expectedManifests, "a");
+
+    Assert.assertEquals("Should be one delete file manifest entry", 1, expected.size());
+    Assert.assertEquals("Metadata table should return one delete file", 1, actual.size());
+
+    TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expected.get(0), actual.get(0));
+  }
+
+  /**
+   * Find matching manifest entries of an Iceberg table
+   * @param table iceberg table
+   * @param entriesTableSchema schema of Manifest entries
+   * @param manifestsToExplore manifests to explore of the table
+   * @param partValue partition value that manifest entries must match, or null to skip filtering
+   */
+  private List<Record> expectedEntries(Table table, Schema entriesTableSchema,
+                                       List<ManifestFile> manifestsToExplore, String partValue) throws IOException {
+    List<Record> expected = Lists.newArrayList();
+    for (ManifestFile manifest : manifestsToExplore) {
+      InputFile in = table.io().newInputFile(manifest.path());
+      try (CloseableIterable<Record> rows = Avro.read(in).project(entriesTableSchema).build()) {
+        for (Record record : rows) {
+          if ((Integer) record.get("status") < 2 /* added or existing */) {
+            Record file = (Record) record.get("data_file");
+            if (partitionMatch(file, partValue)) {
+              asDeleteRecords(file);
+              expected.add(file);
+            }
+          }
+        }
+      }
+    }
+    return expected;
+  }
+
+  // Populate certain fields derived in the metadata tables
+  private void asDeleteRecords(Record file) {
+    file.put(0, FileContent.POSITION_DELETES.id());
+    file.put(3, 0); // specId
+  }
+
+  private boolean partitionMatch(Record file, String partValue) {
+    if (partValue == null) {
+      return true;
+    }
+    Record partition = (Record) file.get(4);
+    return partValue.equals(partition.get(0).toString());
+  }
+}