You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/05/29 20:13:48 UTC

[iceberg] branch master updated: Add DeleteFile and manifest reader and writer for deletes (#1064)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 527240b  Add DeleteFile and manifest reader and writer for deletes (#1064)
527240b is described below

commit 527240b445b23cef1a655eccbb3b2c0eb7d178c1
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri May 29 12:26:56 2020 -0700

    Add DeleteFile and manifest reader and writer for deletes (#1064)
---
 .../main/java/org/apache/iceberg/ContentFile.java  | 116 +++++++
 api/src/main/java/org/apache/iceberg/DataFile.java |  87 +----
 .../main/java/org/apache/iceberg/DeleteFile.java   |  17 +-
 .../expressions/InclusiveMetricsEvaluator.java     |   7 +-
 .../{GenericDataFile.java => BaseFile.java}        | 272 +++++++--------
 .../org/apache/iceberg/BaseManifestReader.java     |  25 +-
 .../org/apache/iceberg/BaseMetastoreCatalog.java   |   2 +-
 .../org/apache/iceberg/BaseRewriteManifests.java   |   6 +-
 .../main/java/org/apache/iceberg/BaseSnapshot.java |   4 +-
 ...ableMetadata.java => DeleteManifestReader.java} |  20 +-
 .../main/java/org/apache/iceberg/FileHistory.java  |   8 +-
 .../main/java/org/apache/iceberg/FindFiles.java    |   2 +-
 .../java/org/apache/iceberg/GenericDataFile.java   | 375 +--------------------
 .../java/org/apache/iceberg/GenericDeleteFile.java |  76 +++++
 .../org/apache/iceberg/GenericManifestEntry.java   |  36 +-
 .../org/apache/iceberg/InheritableMetadata.java    |   2 +-
 .../apache/iceberg/InheritableMetadataFactory.java |   6 +-
 .../org/apache/iceberg/ManifestEntriesTable.java   |   2 +-
 .../java/org/apache/iceberg/ManifestEntry.java     |   8 +-
 .../java/org/apache/iceberg/ManifestFiles.java     |  48 ++-
 .../java/org/apache/iceberg/ManifestGroup.java     |  10 +-
 .../java/org/apache/iceberg/ManifestWriter.java    | 117 ++++---
 .../apache/iceberg/MergingSnapshotProducer.java    |   8 +-
 .../java/org/apache/iceberg/RemoveSnapshots.java   |   4 +-
 .../main/java/org/apache/iceberg/ScanSummary.java  |   6 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |   4 +-
 .../main/java/org/apache/iceberg/V1Metadata.java   |  15 +-
 .../main/java/org/apache/iceberg/V2Metadata.java   |  42 ++-
 .../java/org/apache/iceberg/TableTestBase.java     |  24 +-
 .../org/apache/iceberg/TestManifestReader.java     |   6 +-
 .../apache/iceberg/TestManifestWriterVersions.java |  91 ++++-
 .../java/org/apache/iceberg/TestMergeAppend.java   |   2 +-
 .../org/apache/iceberg/TestRewriteManifests.java   |   6 +-
 33 files changed, 668 insertions(+), 786 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java
new file mode 100644
index 0000000..262ea17
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
+ *
+ * @param <F> the concrete Java class of a ContentFile instance.
+ */
+public interface ContentFile<F> {
+  /**
+   * @return type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
+   */
+  FileContent content();
+
+  /**
+   * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+   */
+  CharSequence path();
+
+  /**
+   * @return format of the file
+   */
+  FileFormat format();
+
+  /**
+   * @return partition for this file as a {@link StructLike}
+   */
+  StructLike partition();
+
+  /**
+   * @return the number of top-level records in the file
+   */
+  long recordCount();
+
+  /**
+   * @return the file size in bytes
+   */
+  long fileSizeInBytes();
+
+  /**
+   * @return if collected, map from column ID to the size of the column in bytes, null otherwise
+   */
+  Map<Integer, Long> columnSizes();
+
+  /**
+   * @return if collected, map from column ID to the count of its non-null values, null otherwise
+   */
+  Map<Integer, Long> valueCounts();
+
+  /**
+   * @return if collected, map from column ID to its null value count, null otherwise
+   */
+  Map<Integer, Long> nullValueCounts();
+
+  /**
+   * @return if collected, map from column ID to value lower bounds, null otherwise
+   */
+  Map<Integer, ByteBuffer> lowerBounds();
+
+  /**
+   * @return if collected, map from column ID to value upper bounds, null otherwise
+   */
+  Map<Integer, ByteBuffer> upperBounds();
+
+  /**
+   * @return metadata about how this file is encrypted, or null if the file is stored in plain
+   *         text.
+   */
+  ByteBuffer keyMetadata();
+
+  /**
+   * @return List of recommended split locations, if applicable, null otherwise.
+   * When available, this information is used for planning scan tasks whose boundaries
+   * are determined by these offsets. The returned list must be sorted in ascending order.
+   */
+  List<Long> splitOffsets();
+
+
+  /**
+   * Copies this file. Manifest readers can reuse file instances; use
+   * this method to copy data when collecting files from tasks.
+   *
+   * @return a copy of this data file
+   */
+  F copy();
+
+  /**
+   * Copies this file without file stats. Manifest readers can reuse file instances; use
+   * this method to copy data without stats when collecting files.
+   *
+   * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
+   */
+  F copyWithoutStats();
+}
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index d9a4441..747057b 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -19,9 +19,6 @@
 
 package org.apache.iceberg;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.BinaryType;
 import org.apache.iceberg.types.Types.IntegerType;
@@ -35,9 +32,9 @@ import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 /**
- * Interface for files listed in a table manifest.
+ * Interface for data files listed in a table manifest.
  */
-public interface DataFile {
+public interface DataFile extends ContentFile<DataFile> {
   // fields for adding delete data files
   Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
       "Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
@@ -86,86 +83,8 @@ public interface DataFile {
   /**
    * @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
    */
+  @Override
   default FileContent content() {
     return FileContent.DATA;
   }
-
-  /**
-   * @return fully qualified path to the file, suitable for constructing a Hadoop Path
-   */
-  CharSequence path();
-
-  /**
-   * @return format of the data file
-   */
-  FileFormat format();
-
-  /**
-   * @return partition data for this file as a {@link StructLike}
-   */
-  StructLike partition();
-
-  /**
-   * @return the number of top-level records in the data file
-   */
-  long recordCount();
-
-  /**
-   * @return the data file size in bytes
-   */
-  long fileSizeInBytes();
-
-  /**
-   * @return if collected, map from column ID to the size of the column in bytes, null otherwise
-   */
-  Map<Integer, Long> columnSizes();
-
-  /**
-   * @return if collected, map from column ID to the count of its non-null values, null otherwise
-   */
-  Map<Integer, Long> valueCounts();
-
-  /**
-   * @return if collected, map from column ID to its null value count, null otherwise
-   */
-  Map<Integer, Long> nullValueCounts();
-
-  /**
-   * @return if collected, map from column ID to value lower bounds, null otherwise
-   */
-  Map<Integer, ByteBuffer> lowerBounds();
-
-  /**
-   * @return if collected, map from column ID to value upper bounds, null otherwise
-   */
-  Map<Integer, ByteBuffer> upperBounds();
-
-  /**
-   * @return metadata about how this file is encrypted, or null if the file is stored in plain
-   *         text.
-   */
-  ByteBuffer keyMetadata();
-
-  /**
-   * @return List of recommended split locations, if applicable, null otherwise.
-   * When available, this information is used for planning scan tasks whose boundaries
-   * are determined by these offsets. The returned list must be sorted in ascending order.
-   */
-  List<Long> splitOffsets();
-
-  /**
-   * Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use
-   * this method to copy data when collecting files from tasks.
-   *
-   * @return a copy of this data file
-   */
-  DataFile copy();
-
-  /**
-   * Copies this {@link DataFile data file} without file stats. Manifest readers can reuse data file instances; use
-   * this method to copy data without stats when collecting files.
-   *
-   * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
-   */
-  DataFile copyWithoutStats();
 }
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/api/src/main/java/org/apache/iceberg/DeleteFile.java
similarity index 62%
copy from core/src/main/java/org/apache/iceberg/InheritableMetadata.java
copy to api/src/main/java/org/apache/iceberg/DeleteFile.java
index 5eebe8a..9adc0fb 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFile.java
@@ -19,8 +19,19 @@
 
 package org.apache.iceberg;
 
-import java.io.Serializable;
+import java.util.List;
 
-interface InheritableMetadata extends Serializable {
-  ManifestEntry apply(ManifestEntry manifestEntry);
+/**
+ * Interface for delete files listed in a table delete manifest.
+ */
+public interface DeleteFile extends ContentFile<DeleteFile> {
+  /**
+   * @return List of recommended split locations, if applicable, null otherwise.
+   * When available, this information is used for planning scan tasks whose boundaries
+   * are determined by these offsets. The returned list must be sorted in ascending order.
+   */
+  @Override
+  default List<Long> splitOffsets() {
+    return null;
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 4069586..148ac4f 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
@@ -40,7 +41,7 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
  * <p>
  * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
  * <p>
- * Files are passed to {@link #eval(DataFile)}, which returns true if the file may contain matching
+ * Files are passed to {@link #eval(ContentFile)}, which returns true if the file may contain matching
  * rows and false if the file cannot contain matching rows. Files may be skipped if and only if the
  * return value of {@code eval} is false.
  */
@@ -70,7 +71,7 @@ public class InclusiveMetricsEvaluator {
    * @param file a data file
    * @return false if the file cannot contain rows that match the expression, true otherwise.
    */
-  public boolean eval(DataFile file) {
+  public boolean eval(ContentFile<?> file) {
     // TODO: detect the case where a column is missing from the file using file's max field id.
     return visitor().eval(file);
   }
@@ -84,7 +85,7 @@ public class InclusiveMetricsEvaluator {
     private Map<Integer, ByteBuffer> lowerBounds = null;
     private Map<Integer, ByteBuffer> upperBounds = null;
 
-    private boolean eval(DataFile file) {
+    private boolean eval(ContentFile<?> file) {
       if (file.recordCount() == 0) {
         return ROWS_CANNOT_MATCH;
       }
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
similarity index 70%
copy from core/src/main/java/org/apache/iceberg/GenericDataFile.java
copy to core/src/main/java/org/apache/iceberg/BaseFile.java
index 6c6b1ba..7b03418 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -19,28 +19,31 @@
 
 package org.apache.iceberg;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificData;
 import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
 
-class GenericDataFile
-    implements DataFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
-  private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
-  private static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
+/**
+ * Base class for both {@link DataFile} and {@link DeleteFile}.
+ */
+abstract class BaseFile<F>
+    implements ContentFile<F>, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
+  static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
+  static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
     @Override
     public PartitionData copy() {
       return this; // this does not change
@@ -50,6 +53,7 @@ class GenericDataFile
   private int[] fromProjectionPos;
   private Types.StructType partitionType;
 
+  private FileContent content = FileContent.DATA;
   private String filePath = null;
   private FileFormat format = null;
   private PartitionData partitionData = null;
@@ -66,13 +70,12 @@ class GenericDataFile
   private byte[] keyMetadata = null;
 
   // cached schema
-  private transient org.apache.avro.Schema avroSchema = null;
+  private transient Schema avroSchema = null;
 
   /**
    * Used by Avro reflection to instantiate this class when reading manifest files.
    */
-  @SuppressWarnings("checkstyle:RedundantModifier") // Must be public
-  public GenericDataFile(org.apache.avro.Schema avroSchema) {
+  BaseFile(Schema avroSchema) {
     this.avroSchema = avroSchema;
 
     Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType();
@@ -105,28 +108,12 @@ class GenericDataFile
     this.partitionData = new PartitionData(partitionType);
   }
 
-  GenericDataFile(String filePath, FileFormat format, long recordCount,
-                  long fileSizeInBytes) {
-    this.filePath = filePath;
-    this.format = format;
-    this.partitionData = EMPTY_PARTITION_DATA;
-    this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
-    this.recordCount = recordCount;
-    this.fileSizeInBytes = fileSizeInBytes;
-  }
-
-  GenericDataFile(String filePath, FileFormat format, PartitionData partition,
-                  long recordCount, long fileSizeInBytes) {
-    this.filePath = filePath;
-    this.format = format;
-    this.partitionData = partition;
-    this.partitionType = partition.getPartitionType();
-    this.recordCount = recordCount;
-    this.fileSizeInBytes = fileSizeInBytes;
-  }
-
-  GenericDataFile(String filePath, FileFormat format, PartitionData partition,
-                  long fileSizeInBytes, Metrics metrics, List<Long> splitOffsets) {
+  BaseFile(FileContent content, String filePath, FileFormat format,
+           PartitionData partition, long fileSizeInBytes, long recordCount,
+           Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts, Map<Integer, Long> nullValueCounts,
+           Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer> upperBounds, List<Long> splitOffsets,
+           ByteBuffer keyMetadata) {
+    this.content = content;
     this.filePath = filePath;
     this.format = format;
 
@@ -140,20 +127,14 @@ class GenericDataFile
     }
 
     // this will throw NPE if metrics.recordCount is null
-    this.recordCount = metrics.recordCount();
+    this.recordCount = recordCount;
     this.fileSizeInBytes = fileSizeInBytes;
-    this.columnSizes = metrics.columnSizes();
-    this.valueCounts = metrics.valueCounts();
-    this.nullValueCounts = metrics.nullValueCounts();
-    this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
-    this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
+    this.columnSizes = columnSizes;
+    this.valueCounts = valueCounts;
+    this.nullValueCounts = nullValueCounts;
+    this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds);
+    this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
     this.splitOffsets = copy(splitOffsets);
-  }
-
-  GenericDataFile(String filePath, FileFormat format, PartitionData partition,
-                  long fileSizeInBytes, Metrics metrics,
-                  ByteBuffer keyMetadata, List<Long> splitOffsets) {
-    this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
     this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
   }
 
@@ -163,7 +144,8 @@ class GenericDataFile
    * @param toCopy a generic data file to copy.
    * @param fullCopy whether to copy all fields or to drop column-level stats
    */
-  private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
+  BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
+    this.content = toCopy.content;
     this.filePath = toCopy.filePath;
     this.format = toCopy.format;
     this.partitionData = toCopy.partitionData.copy();
@@ -192,76 +174,13 @@ class GenericDataFile
   /**
    * Constructor for Java serialization.
    */
-  GenericDataFile() {
+  BaseFile() {
   }
 
-  @Override
-  public FileContent content() {
-    return FileContent.DATA;
-  }
+  protected abstract Schema getAvroSchema(Types.StructType partitionStruct);
 
   @Override
-  public CharSequence path() {
-    return filePath;
-  }
-
-  @Override
-  public FileFormat format() {
-    return format;
-  }
-
-  @Override
-  public StructLike partition() {
-    return partitionData;
-  }
-
-  @Override
-  public long recordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public long fileSizeInBytes() {
-    return fileSizeInBytes;
-  }
-
-  @Override
-  public Map<Integer, Long> columnSizes() {
-    return columnSizes;
-  }
-
-  @Override
-  public Map<Integer, Long> valueCounts() {
-    return valueCounts;
-  }
-
-  @Override
-  public Map<Integer, Long> nullValueCounts() {
-    return nullValueCounts;
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> lowerBounds() {
-    return lowerBounds;
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> upperBounds() {
-    return upperBounds;
-  }
-
-  @Override
-  public ByteBuffer keyMetadata() {
-    return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
-  }
-
-  @Override
-  public List<Long> splitOffsets() {
-    return splitOffsets;
-  }
-
-  @Override
-  public org.apache.avro.Schema getSchema() {
+  public Schema getSchema() {
     if (avroSchema == null) {
       this.avroSchema = getAvroSchema(partitionType);
     }
@@ -270,7 +189,7 @@ class GenericDataFile
 
   @Override
   @SuppressWarnings("unchecked")
-  public void put(int i, Object v) {
+  public void put(int i, Object value) {
     int pos = i;
     // if the schema was projected, map the incoming ordinal to the expected one
     if (fromProjectionPos != null) {
@@ -278,45 +197,44 @@ class GenericDataFile
     }
     switch (pos) {
       case 0:
-        Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
-            "Invalid content for a DataFile: %s", v);
+        this.content = value != null ? FileContent.values()[(Integer) value] : FileContent.DATA;
         return;
       case 1:
         // always coerce to String for Serializable
-        this.filePath = v.toString();
+        this.filePath = value.toString();
         return;
       case 2:
-        this.format = FileFormat.valueOf(v.toString());
+        this.format = FileFormat.valueOf(value.toString());
         return;
       case 3:
-        this.partitionData = (PartitionData) v;
+        this.partitionData = (PartitionData) value;
         return;
       case 4:
-        this.recordCount = (Long) v;
+        this.recordCount = (Long) value;
         return;
       case 5:
-        this.fileSizeInBytes = (Long) v;
+        this.fileSizeInBytes = (Long) value;
         return;
       case 6:
-        this.columnSizes = (Map<Integer, Long>) v;
+        this.columnSizes = (Map<Integer, Long>) value;
         return;
       case 7:
-        this.valueCounts = (Map<Integer, Long>) v;
+        this.valueCounts = (Map<Integer, Long>) value;
         return;
       case 8:
-        this.nullValueCounts = (Map<Integer, Long>) v;
+        this.nullValueCounts = (Map<Integer, Long>) value;
         return;
       case 9:
-        this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
+        this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
         return;
       case 10:
-        this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
+        this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
         return;
       case 11:
-        this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v);
+        this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
         return;
       case 12:
-        this.splitOffsets = (List<Long>) v;
+        this.splitOffsets = (List<Long>) value;
         return;
       default:
         // ignore the object, it must be from a newer version of the format
@@ -337,7 +255,7 @@ class GenericDataFile
     }
     switch (pos) {
       case 0:
-        return FileContent.DATA.id();
+        return content.id();
       case 1:
         return filePath;
       case 2:
@@ -359,7 +277,7 @@ class GenericDataFile
       case 10:
         return upperBounds;
       case 11:
-        return keyMetadata();
+        return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
       case 12:
         return splitOffsets;
       default:
@@ -372,44 +290,61 @@ class GenericDataFile
     return javaClass.cast(get(pos));
   }
 
-  private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionType) {
-    Types.StructType type = DataFile.getType(partitionType);
-    return AvroSchemaUtil.convert(type, ImmutableMap.of(
-        type, GenericDataFile.class.getName(),
-        partitionType, PartitionData.class.getName()));
-  }
-
   @Override
   public int size() {
     return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
   }
 
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("file_path", filePath)
-        .add("file_format", format)
-        .add("partition", partitionData)
-        .add("record_count", recordCount)
-        .add("file_size_in_bytes", fileSizeInBytes)
-        .add("column_sizes", columnSizes)
-        .add("value_counts", valueCounts)
-        .add("null_value_counts", nullValueCounts)
-        .add("lower_bounds", lowerBounds)
-        .add("upper_bounds", upperBounds)
-        .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
-        .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
-        .toString();
+  public FileContent content() {
+    return content;
   }
 
-  @Override
-  public DataFile copyWithoutStats() {
-    return new GenericDataFile(this, false /* drop stats */);
+  public CharSequence path() {
+    return filePath;
   }
 
-  @Override
-  public DataFile copy() {
-    return new GenericDataFile(this, true /* full copy */);
+  public FileFormat format() {
+    return format;
+  }
+
+  public StructLike partition() {
+    return partitionData;
+  }
+
+  public long recordCount() {
+    return recordCount;
+  }
+
+  public long fileSizeInBytes() {
+    return fileSizeInBytes;
+  }
+
+  public Map<Integer, Long> columnSizes() {
+    return columnSizes;
+  }
+
+  public Map<Integer, Long> valueCounts() {
+    return valueCounts;
+  }
+
+  public Map<Integer, Long> nullValueCounts() {
+    return nullValueCounts;
+  }
+
+  public Map<Integer, ByteBuffer> lowerBounds() {
+    return lowerBounds;
+  }
+
+  public Map<Integer, ByteBuffer> upperBounds() {
+    return upperBounds;
+  }
+
+  public ByteBuffer keyMetadata() {
+    return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
+  }
+
+  public List<Long> splitOffsets() {
+    return splitOffsets;
   }
 
   private static <K, V> Map<K, V> copy(Map<K, V> map) {
@@ -429,4 +364,23 @@ class GenericDataFile
     }
     return null;
   }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("content", content.toString().toLowerCase(Locale.ROOT))
+        .add("file_path", filePath)
+        .add("file_format", format)
+        .add("partition", partitionData)
+        .add("record_count", recordCount)
+        .add("file_size_in_bytes", fileSizeInBytes)
+        .add("column_sizes", columnSizes)
+        .add("value_counts", valueCounts)
+        .add("null_value_counts", nullValueCounts)
+        .add("lower_bounds", lowerBounds)
+        .add("upper_bounds", upperBounds)
+        .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
+        .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
+        .toString();
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
index d253555..7217fc0 100644
--- a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
@@ -47,17 +47,18 @@ import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
 /**
  * Base reader for data and delete manifest files.
  *
- * @param <T> The Java class of files returned by this reader.
+ * @param <F> The Java class of files returned by this reader.
  * @param <ThisT> The Java class of this reader, returned by configuration methods.
  */
-abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements CloseableIterable<T> {
+abstract class BaseManifestReader<F extends ContentFile<F>, ThisT>
+    extends CloseableGroup implements CloseableIterable<F> {
   static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
   private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
       "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
 
   protected enum FileType {
     DATA_FILES(GenericDataFile.class.getName()),
-    DELETE_FILES("...");
+    DELETE_FILES(GenericDeleteFile.class.getName());
 
     private final String fileClass;
 
@@ -95,7 +96,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
     this.content = content;
 
     try {
-      try (AvroIterable<ManifestEntry> headerReader = Avro.read(file)
+      try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
           .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
           .build()) {
         this.metadata = headerReader.getMetadata();
@@ -163,7 +164,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
     return self();
   }
 
-  CloseableIterable<ManifestEntry> entries() {
+  CloseableIterable<ManifestEntry<F>> entries() {
     if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
         (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
@@ -183,13 +184,13 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
     }
   }
 
-  private CloseableIterable<ManifestEntry> open(Schema projection) {
+  private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
     FileFormat format = FileFormat.fromFileName(file.location());
     Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
 
     switch (format) {
       case AVRO:
-        AvroIterable<ManifestEntry> reader = Avro.read(file)
+        AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
             .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
             .rename("manifest_entry", GenericManifestEntry.class.getName())
             .rename("partition", PartitionData.class.getName())
@@ -209,7 +210,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
     }
   }
 
-  CloseableIterable<ManifestEntry> liveEntries() {
+  CloseableIterable<ManifestEntry<F>> liveEntries() {
     return CloseableIterable.filter(entries(),
         entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
   }
@@ -218,13 +219,11 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
    * @return an Iterator of DataFile. Makes defensive copies of files before returning
    */
   @Override
-  @SuppressWarnings("unchecked")
-  public CloseableIterator<T> iterator() {
+  public CloseableIterator<F> iterator() {
     if (dropStats(rowFilter, columns)) {
-      return (CloseableIterator<T>) CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats())
-          .iterator();
+      return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
     } else {
-      return (CloseableIterator<T>) CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
+      return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 5fd02f5..8d0951e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -262,7 +262,7 @@ public abstract class BaseMetastoreCatalog implements Catalog {
         .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
         .run(manifest -> {
           try (ManifestReader reader = ManifestFiles.read(manifest, io)) {
-            for (ManifestEntry entry : reader.entries()) {
+            for (ManifestEntry<?> entry : reader.entries()) {
               // intern the file path because the weak key map uses identity (==) instead of equals
               String path = entry.file().path().toString().intern();
               Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true);
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 405d3fe..24c3c2d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -287,7 +287,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
     return activeFilesCount;
   }
 
-  private void appendEntry(ManifestEntry entry, Object key, int partitionSpecId) {
+  private void appendEntry(ManifestEntry<DataFile> entry, Object key, int partitionSpecId) {
     Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
     Preconditions.checkNotNull(key, "Key cannot be null");
 
@@ -323,13 +323,13 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
 
   class WriterWrapper {
     private final PartitionSpec spec;
-    private ManifestWriter writer;
+    private ManifestWriter<DataFile> writer;
 
     WriterWrapper(PartitionSpec spec) {
       this.spec = spec;
     }
 
-    synchronized void addEntry(ManifestEntry entry) {
+    synchronized void addEntry(ManifestEntry<DataFile> entry) {
       if (writer == null) {
         writer = newManifestWriter(spec);
       } else if (writer.length() >= getManifestTargetSizeBytes()) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 91466c8..4da4b37 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -158,10 +158,10 @@ class BaseSnapshot implements Snapshot {
     // read only manifests that were created by this snapshot
     Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
         manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
-    try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(io, changedManifests)
+    try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, changedManifests)
         .ignoreExisting()
         .entries()) {
-      for (ManifestEntry entry : entries) {
+      for (ManifestEntry<DataFile> entry : entries) {
         switch (entry.status()) {
           case ADDED:
             adds.add(entry.file().copy());
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
similarity index 58%
copy from core/src/main/java/org/apache/iceberg/InheritableMetadata.java
copy to core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
index 5eebe8a..dfeee6b 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
@@ -19,8 +19,22 @@
 
 package org.apache.iceberg;
 
-import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
 
-interface InheritableMetadata extends Serializable {
-  ManifestEntry apply(ManifestEntry manifestEntry);
+/**
+ * Reader for manifest files.
+ * <p>
+ * Create readers using {@link ManifestFiles#readDeleteManifest(ManifestFile, FileIO, Map)}.
+ */
+public class DeleteManifestReader extends BaseManifestReader<DeleteFile, DeleteManifestReader> {
+  protected DeleteManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById, InheritableMetadata metadata) {
+    super(file, specsById, metadata, FileType.DELETE_FILES);
+  }
+
+  @Override
+  protected DeleteManifestReader self() {
+    return this;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/FileHistory.java b/core/src/main/java/org/apache/iceberg/FileHistory.java
index 89d8fd3..b6645f7 100644
--- a/core/src/main/java/org/apache/iceberg/FileHistory.java
+++ b/core/src/main/java/org/apache/iceberg/FileHistory.java
@@ -80,7 +80,7 @@ public class FileHistory {
     }
 
     @SuppressWarnings("unchecked")
-    public Iterable<ManifestEntry> build() {
+    public Iterable<ManifestEntry<?>> build() {
       Iterable<Snapshot> snapshots = table.snapshots();
 
       if (startTime != null) {
@@ -100,11 +100,11 @@ public class FileHistory {
       // a manifest group will only read each manifest once
       ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations().io(), manifests);
 
-      List<ManifestEntry> results = Lists.newArrayList();
-      try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
+      List<ManifestEntry<?>> results = Lists.newArrayList();
+      try (CloseableIterable<ManifestEntry<DataFile>> entries = group.select(HISTORY_COLUMNS).entries()) {
         // TODO: replace this with an IN predicate
         CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
-        for (ManifestEntry entry : entries) {
+        for (ManifestEntry<?> entry : entries) {
           if (entry != null && locations.contains(locationWrapper.set(entry.file().path()))) {
             results.add(entry.copy());
           }
diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java
index dfe2021..94e6f8a 100644
--- a/core/src/main/java/org/apache/iceberg/FindFiles.java
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -197,7 +197,7 @@ public class FindFiles {
       }
 
       // when snapshot is not null
-      CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests())
+      CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(ops.io(), snapshot.manifests())
           .specsById(ops.current().specsById())
           .filterData(rowFilter)
           .filterFiles(fileFilter)
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 6c6b1ba..4e86587 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -19,142 +19,43 @@
 
 package org.apache.iceberg;
 
-import java.io.Serializable;
+import com.google.common.collect.ImmutableMap;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
-
-class GenericDataFile
-    implements DataFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
-  private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
-  private static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
-    @Override
-    public PartitionData copy() {
-      return this; // this does not change
-    }
-  };
-
-  private int[] fromProjectionPos;
-  private Types.StructType partitionType;
-
-  private String filePath = null;
-  private FileFormat format = null;
-  private PartitionData partitionData = null;
-  private Long recordCount = null;
-  private long fileSizeInBytes = -1L;
-
-  // optional fields
-  private Map<Integer, Long> columnSizes = null;
-  private Map<Integer, Long> valueCounts = null;
-  private Map<Integer, Long> nullValueCounts = null;
-  private Map<Integer, ByteBuffer> lowerBounds = null;
-  private Map<Integer, ByteBuffer> upperBounds = null;
-  private List<Long> splitOffsets = null;
-  private byte[] keyMetadata = null;
-
-  // cached schema
-  private transient org.apache.avro.Schema avroSchema = null;
 
+class GenericDataFile extends BaseFile<DataFile> implements DataFile {
   /**
    * Used by Avro reflection to instantiate this class when reading manifest files.
    */
-  @SuppressWarnings("checkstyle:RedundantModifier") // Must be public
-  public GenericDataFile(org.apache.avro.Schema avroSchema) {
-    this.avroSchema = avroSchema;
-
-    Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType();
-
-    // partition type may be null if the field was not projected
-    Type partType = schema.fieldType("partition");
-    if (partType != null) {
-      this.partitionType = partType.asNestedType().asStructType();
-    } else {
-      this.partitionType = EMPTY_STRUCT_TYPE;
-    }
-
-    List<Types.NestedField> fields = schema.fields();
-    List<Types.NestedField> allFields = DataFile.getType(partitionType).fields();
-    this.fromProjectionPos = new int[fields.size()];
-    for (int i = 0; i < fromProjectionPos.length; i += 1) {
-      boolean found = false;
-      for (int j = 0; j < allFields.size(); j += 1) {
-        if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
-          found = true;
-          fromProjectionPos[i] = j;
-        }
-      }
-
-      if (!found) {
-        throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
-      }
-    }
-
-    this.partitionData = new PartitionData(partitionType);
+  GenericDataFile(org.apache.avro.Schema avroSchema) {
+    super(avroSchema);
   }
 
   GenericDataFile(String filePath, FileFormat format, long recordCount,
                   long fileSizeInBytes) {
-    this.filePath = filePath;
-    this.format = format;
-    this.partitionData = EMPTY_PARTITION_DATA;
-    this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
-    this.recordCount = recordCount;
-    this.fileSizeInBytes = fileSizeInBytes;
+    this(filePath, format, null, recordCount, fileSizeInBytes);
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
                   long recordCount, long fileSizeInBytes) {
-    this.filePath = filePath;
-    this.format = format;
-    this.partitionData = partition;
-    this.partitionType = partition.getPartitionType();
-    this.recordCount = recordCount;
-    this.fileSizeInBytes = fileSizeInBytes;
+    super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, recordCount,
+        null, null, null, null, null, null, null);
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
                   long fileSizeInBytes, Metrics metrics, List<Long> splitOffsets) {
-    this.filePath = filePath;
-    this.format = format;
-
-    // this constructor is used by DataFiles.Builder, which passes null for unpartitioned data
-    if (partition == null) {
-      this.partitionData = EMPTY_PARTITION_DATA;
-      this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
-    } else {
-      this.partitionData = partition;
-      this.partitionType = partition.getPartitionType();
-    }
-
-    // this will throw NPE if metrics.recordCount is null
-    this.recordCount = metrics.recordCount();
-    this.fileSizeInBytes = fileSizeInBytes;
-    this.columnSizes = metrics.columnSizes();
-    this.valueCounts = metrics.valueCounts();
-    this.nullValueCounts = metrics.nullValueCounts();
-    this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
-    this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
-    this.splitOffsets = copy(splitOffsets);
+    this(filePath, format, partition, fileSizeInBytes, metrics, null, splitOffsets);
   }
 
   GenericDataFile(String filePath, FileFormat format, PartitionData partition,
                   long fileSizeInBytes, Metrics metrics,
                   ByteBuffer keyMetadata, List<Long> splitOffsets) {
-    this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
-    this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
+    super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
+        metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
+        metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, keyMetadata);
   }
 
   /**
@@ -164,29 +65,7 @@ class GenericDataFile
    * @param fullCopy whether to copy all fields or to drop column-level stats
    */
   private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
-    this.filePath = toCopy.filePath;
-    this.format = toCopy.format;
-    this.partitionData = toCopy.partitionData.copy();
-    this.partitionType = toCopy.partitionType;
-    this.recordCount = toCopy.recordCount;
-    this.fileSizeInBytes = toCopy.fileSizeInBytes;
-    if (fullCopy) {
-      // TODO: support lazy conversion to/from map
-      this.columnSizes = copy(toCopy.columnSizes);
-      this.valueCounts = copy(toCopy.valueCounts);
-      this.nullValueCounts = copy(toCopy.nullValueCounts);
-      this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
-      this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
-    } else {
-      this.columnSizes = null;
-      this.valueCounts = null;
-      this.nullValueCounts = null;
-      this.lowerBounds = null;
-      this.upperBounds = null;
-    }
-    this.fromProjectionPos = toCopy.fromProjectionPos;
-    this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length);
-    this.splitOffsets = copy(toCopy.splitOffsets);
+    super(toCopy, fullCopy);
   }
 
   /**
@@ -196,213 +75,6 @@ class GenericDataFile
   }
 
   @Override
-  public FileContent content() {
-    return FileContent.DATA;
-  }
-
-  @Override
-  public CharSequence path() {
-    return filePath;
-  }
-
-  @Override
-  public FileFormat format() {
-    return format;
-  }
-
-  @Override
-  public StructLike partition() {
-    return partitionData;
-  }
-
-  @Override
-  public long recordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public long fileSizeInBytes() {
-    return fileSizeInBytes;
-  }
-
-  @Override
-  public Map<Integer, Long> columnSizes() {
-    return columnSizes;
-  }
-
-  @Override
-  public Map<Integer, Long> valueCounts() {
-    return valueCounts;
-  }
-
-  @Override
-  public Map<Integer, Long> nullValueCounts() {
-    return nullValueCounts;
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> lowerBounds() {
-    return lowerBounds;
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> upperBounds() {
-    return upperBounds;
-  }
-
-  @Override
-  public ByteBuffer keyMetadata() {
-    return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
-  }
-
-  @Override
-  public List<Long> splitOffsets() {
-    return splitOffsets;
-  }
-
-  @Override
-  public org.apache.avro.Schema getSchema() {
-    if (avroSchema == null) {
-      this.avroSchema = getAvroSchema(partitionType);
-    }
-    return avroSchema;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void put(int i, Object v) {
-    int pos = i;
-    // if the schema was projected, map the incoming ordinal to the expected one
-    if (fromProjectionPos != null) {
-      pos = fromProjectionPos[i];
-    }
-    switch (pos) {
-      case 0:
-        Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
-            "Invalid content for a DataFile: %s", v);
-        return;
-      case 1:
-        // always coerce to String for Serializable
-        this.filePath = v.toString();
-        return;
-      case 2:
-        this.format = FileFormat.valueOf(v.toString());
-        return;
-      case 3:
-        this.partitionData = (PartitionData) v;
-        return;
-      case 4:
-        this.recordCount = (Long) v;
-        return;
-      case 5:
-        this.fileSizeInBytes = (Long) v;
-        return;
-      case 6:
-        this.columnSizes = (Map<Integer, Long>) v;
-        return;
-      case 7:
-        this.valueCounts = (Map<Integer, Long>) v;
-        return;
-      case 8:
-        this.nullValueCounts = (Map<Integer, Long>) v;
-        return;
-      case 9:
-        this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
-        return;
-      case 10:
-        this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
-        return;
-      case 11:
-        this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v);
-        return;
-      case 12:
-        this.splitOffsets = (List<Long>) v;
-        return;
-      default:
-        // ignore the object, it must be from a newer version of the format
-    }
-  }
-
-  @Override
-  public <T> void set(int pos, T value) {
-    put(pos, value);
-  }
-
-  @Override
-  public Object get(int i) {
-    int pos = i;
-    // if the schema was projected, map the incoming ordinal to the expected one
-    if (fromProjectionPos != null) {
-      pos = fromProjectionPos[i];
-    }
-    switch (pos) {
-      case 0:
-        return FileContent.DATA.id();
-      case 1:
-        return filePath;
-      case 2:
-        return format != null ? format.toString() : null;
-      case 3:
-        return partitionData;
-      case 4:
-        return recordCount;
-      case 5:
-        return fileSizeInBytes;
-      case 6:
-        return columnSizes;
-      case 7:
-        return valueCounts;
-      case 8:
-        return nullValueCounts;
-      case 9:
-        return lowerBounds;
-      case 10:
-        return upperBounds;
-      case 11:
-        return keyMetadata();
-      case 12:
-        return splitOffsets;
-      default:
-        throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
-    }
-  }
-
-  @Override
-  public <T> T get(int pos, Class<T> javaClass) {
-    return javaClass.cast(get(pos));
-  }
-
-  private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionType) {
-    Types.StructType type = DataFile.getType(partitionType);
-    return AvroSchemaUtil.convert(type, ImmutableMap.of(
-        type, GenericDataFile.class.getName(),
-        partitionType, PartitionData.class.getName()));
-  }
-
-  @Override
-  public int size() {
-    return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("file_path", filePath)
-        .add("file_format", format)
-        .add("partition", partitionData)
-        .add("record_count", recordCount)
-        .add("file_size_in_bytes", fileSizeInBytes)
-        .add("column_sizes", columnSizes)
-        .add("value_counts", valueCounts)
-        .add("null_value_counts", nullValueCounts)
-        .add("lower_bounds", lowerBounds)
-        .add("upper_bounds", upperBounds)
-        .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
-        .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
-        .toString();
-  }
-
-  @Override
   public DataFile copyWithoutStats() {
     return new GenericDataFile(this, false /* drop stats */);
   }
@@ -412,21 +84,10 @@ class GenericDataFile
     return new GenericDataFile(this, true /* full copy */);
   }
 
-  private static <K, V> Map<K, V> copy(Map<K, V> map) {
-    if (map != null) {
-      Map<K, V> copy = Maps.newHashMapWithExpectedSize(map.size());
-      copy.putAll(map);
-      return Collections.unmodifiableMap(copy);
-    }
-    return null;
-  }
-
-  private static <E> List<E> copy(List<E> list) {
-    if (list != null) {
-      List<E> copy = Lists.newArrayListWithExpectedSize(list.size());
-      copy.addAll(list);
-      return Collections.unmodifiableList(copy);
-    }
-    return null;
+  protected Schema getAvroSchema(Types.StructType partitionStruct) {
+    Types.StructType type = DataFile.getType(partitionStruct);
+    return AvroSchemaUtil.convert(type, ImmutableMap.of(
+        type, GenericDataFile.class.getName(),
+        partitionStruct, PartitionData.class.getName()));
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
new file mode 100644
index 0000000..f4a28d0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.ByteBuffer;
+import org.apache.avro.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Types;
+
+class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
+  /**
+   * Used by Avro reflection to instantiate this class when reading manifest files.
+   */
+  GenericDeleteFile(Schema avroSchema) {
+    super(avroSchema);
+  }
+
+  GenericDeleteFile(FileContent content, String filePath, FileFormat format, PartitionData partition,
+                    long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata) {
+    super(content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
+        metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
+        metrics.lowerBounds(), metrics.upperBounds(), null, keyMetadata);
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param toCopy a generic data file to copy.
+   * @param fullCopy whether to copy all fields or to drop column-level stats
+   */
+  private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
+    super(toCopy, fullCopy);
+  }
+
+  /**
+   * Constructor for Java serialization.
+   */
+  GenericDeleteFile() {
+  }
+
+  @Override
+  public DeleteFile copyWithoutStats() {
+    return new GenericDeleteFile(this, false /* drop stats */);
+  }
+
+  @Override
+  public DeleteFile copy() {
+    return new GenericDeleteFile(this, true /* full copy */);
+  }
+
+  protected Schema getAvroSchema(Types.StructType partitionStruct) {
+    Types.StructType type = DataFile.getType(partitionStruct);
+    return AvroSchemaUtil.convert(type, ImmutableMap.of(
+        type, GenericDeleteFile.class.getName(),
+        partitionStruct, PartitionData.class.getName()));
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
index cc27818..a4bc6f8 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
@@ -25,27 +25,24 @@ import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.types.Types;
 
-class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike {
+class GenericManifestEntry<F extends ContentFile<F>>
+    implements ManifestEntry<F>, IndexedRecord, SpecificData.SchemaConstructable, StructLike {
   private final org.apache.avro.Schema schema;
-  private final V1Metadata.IndexedDataFile fileWrapper;
   private Status status = Status.EXISTING;
   private Long snapshotId = null;
   private Long sequenceNumber = null;
-  private DataFile file = null;
+  private F file = null;
 
   GenericManifestEntry(org.apache.avro.Schema schema) {
     this.schema = schema;
-    this.fileWrapper = null; // do not use the file wrapper to read
   }
 
   GenericManifestEntry(Types.StructType partitionType) {
     this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry");
-    this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema());
   }
 
-  private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) {
+  private GenericManifestEntry(GenericManifestEntry<F> toCopy, boolean fullCopy) {
     this.schema = toCopy.schema;
-    this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema());
     this.status = toCopy.status;
     this.snapshotId = toCopy.snapshotId;
     if (fullCopy) {
@@ -55,7 +52,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
     }
   }
 
-  ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) {
+  ManifestEntry<F> wrapExisting(Long newSnapshotId, Long newSequenceNumber, F newFile) {
     this.status = Status.EXISTING;
     this.snapshotId = newSnapshotId;
     this.sequenceNumber = newSequenceNumber;
@@ -63,7 +60,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
     return this;
   }
 
-  ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) {
+  ManifestEntry<F> wrapAppend(Long newSnapshotId, F newFile) {
     this.status = Status.ADDED;
     this.snapshotId = newSnapshotId;
     this.sequenceNumber = null;
@@ -71,7 +68,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
     return this;
   }
 
-  ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) {
+  ManifestEntry<F> wrapDelete(Long newSnapshotId, F newFile) {
     this.status = Status.DELETED;
     this.snapshotId = newSnapshotId;
     this.sequenceNumber = null;
@@ -104,18 +101,18 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
    * @return a file
    */
   @Override
-  public DataFile file() {
+  public F file() {
     return file;
   }
 
   @Override
-  public ManifestEntry copy() {
-    return new GenericManifestEntry(this, true /* full copy */);
+  public ManifestEntry<F> copy() {
+    return new GenericManifestEntry<>(this, true /* full copy */);
   }
 
   @Override
-  public ManifestEntry copyWithoutStats() {
-    return new GenericManifestEntry(this, false /* drop stats */);
+  public ManifestEntry<F> copyWithoutStats() {
+    return new GenericManifestEntry<>(this, false /* drop stats */);
   }
 
   @Override
@@ -129,6 +126,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void put(int i, Object v) {
     switch (i) {
       case 0:
@@ -141,7 +139,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
         this.sequenceNumber = (Long) v;
         return;
       case 3:
-        this.file = (DataFile) v;
+        this.file = (F) v;
         return;
       default:
         // ignore the object, it must be from a newer version of the format
@@ -163,11 +161,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
       case 2:
         return sequenceNumber;
       case 3:
-        if (fileWrapper == null || file instanceof GenericDataFile) {
-          return file;
-        } else {
-          return fileWrapper.wrap(file);
-        }
+        return file;
       default:
         throw new UnsupportedOperationException("Unknown field ordinal: " + i);
     }
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
index 5eebe8a..44e0521 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
@@ -22,5 +22,5 @@ package org.apache.iceberg;
 import java.io.Serializable;
 
 interface InheritableMetadata extends Serializable {
-  ManifestEntry apply(ManifestEntry manifestEntry);
+  <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry);
 }
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
index 3c271bc..d8ceb40 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
@@ -51,7 +51,7 @@ class InheritableMetadataFactory {
     }
 
     @Override
-    public ManifestEntry apply(ManifestEntry manifestEntry) {
+    public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
       if (manifestEntry.snapshotId() == null) {
         manifestEntry.setSnapshotId(snapshotId);
       }
@@ -70,7 +70,7 @@ class InheritableMetadataFactory {
     }
 
     @Override
-    public ManifestEntry apply(ManifestEntry manifestEntry) {
+    public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
       manifestEntry.setSnapshotId(snapshotId);
       return manifestEntry;
     }
@@ -81,7 +81,7 @@ class InheritableMetadataFactory {
     private EmptyInheritableMetadata() {}
 
     @Override
-    public ManifestEntry apply(ManifestEntry manifestEntry) {
+    public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
       if (manifestEntry.snapshotId() == null) {
         throw new IllegalArgumentException("Entries must have explicit snapshot ids if inherited metadata is empty");
       }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 9e4b896..061230f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -129,7 +129,7 @@ public class ManifestEntriesTable extends BaseMetadataTable {
     public CloseableIterable<StructLike> rows() {
       return CloseableIterable.transform(
           ManifestFiles.read(manifest, io).project(fileSchema).entries(),
-          file -> (GenericManifestEntry) file);
+          file -> (GenericManifestEntry<?>) file);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index bc03850..e07b125 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -25,7 +25,7 @@ import org.apache.iceberg.types.Types.StructType;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
-interface ManifestEntry {
+interface ManifestEntry<F extends ContentFile<F>> {
   enum Status {
     EXISTING(0),
     ADDED(1),
@@ -89,9 +89,9 @@ interface ManifestEntry {
   /**
    * @return a file
    */
-  DataFile file();
+  F file();
 
-  ManifestEntry copy();
+  ManifestEntry<F> copy();
 
-  ManifestEntry copyWithoutStats();
+  ManifestEntry<F> copyWithoutStats();
 }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index ea72e2f..730badf 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -55,6 +55,8 @@ public class ManifestFiles {
    * @return a {@link ManifestReader}
    */
   public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
+    Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
+        "Cannot read a delete manifest with a ManifestReader: %s", manifest);
     InputFile file = io.newInputFile(manifest.path());
     InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
     return new ManifestReader(file, specsById, inheritableMetadata);
@@ -70,7 +72,7 @@ public class ManifestFiles {
    * @param outputFile the destination file location
    * @return a manifest writer
    */
-  public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
+  public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outputFile) {
     return write(1, spec, outputFile, null);
   }
 
@@ -83,7 +85,8 @@ public class ManifestFiles {
    * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
    * @return a manifest writer
    */
-  public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+  public static ManifestWriter<DataFile> write(int formatVersion, PartitionSpec spec, OutputFile outputFile,
+                                               Long snapshotId) {
     switch (formatVersion) {
       case 1:
         return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
@@ -93,6 +96,43 @@ public class ManifestFiles {
     throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
   }
 
+  /**
+   * Returns a new {@link DeleteManifestReader} for a {@link ManifestFile}.
+   *
+   * @param manifest a {@link ManifestFile}
+   * @param io a {@link FileIO}
+   * @param specsById a Map from spec ID to partition spec
+   * @return a {@link DeleteManifestReader}
+   */
+  public static DeleteManifestReader readDeleteManifest(ManifestFile manifest, FileIO io,
+                                                        Map<Integer, PartitionSpec> specsById) {
+    Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
+        "Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
+    InputFile file = io.newInputFile(manifest.path());
+    InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
+    return new DeleteManifestReader(file, specsById, inheritableMetadata);
+  }
+
+  /**
+   * Create a new {@link ManifestWriter} for the given format version.
+   *
+   * @param formatVersion a target format version
+   * @param spec a {@link PartitionSpec}
+   * @param outputFile an {@link OutputFile} where the manifest will be written
+   * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
+   * @return a manifest writer
+   */
+  public static ManifestWriter<DeleteFile> writeDeleteManifest(int formatVersion, PartitionSpec spec,
+                                                               OutputFile outputFile, Long snapshotId) {
+    switch (formatVersion) {
+      case 1:
+        throw new IllegalArgumentException("Cannot write delete files in a v1 table");
+      case 2:
+        return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
+    }
+    throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
+  }
+
   static ManifestFile copyAppendManifest(int formatVersion,
                                          InputFile toCopy, Map<Integer, PartitionSpec> specsById,
                                          OutputFile outputFile, long snapshotId,
@@ -124,10 +164,10 @@ public class ManifestFiles {
   private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, OutputFile outputFile,
                                                    long snapshotId, SnapshotSummary.Builder summaryBuilder,
                                                    ManifestEntry.Status allowedEntryStatus) {
-    ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
+    ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
     boolean threw = true;
     try {
-      for (ManifestEntry entry : reader.entries()) {
+      for (ManifestEntry<DataFile> entry : reader.entries()) {
         Preconditions.checkArgument(
             allowedEntryStatus == entry.status(),
             "Invalid manifest entry status: %s (allowed status: %s)",
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index b98176e..3aca5f9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -47,7 +47,7 @@ class ManifestGroup {
   private final FileIO io;
   private final Set<ManifestFile> manifests;
   private Predicate<ManifestFile> manifestPredicate;
-  private Predicate<ManifestEntry> manifestEntryPredicate;
+  private Predicate<ManifestEntry<DataFile>> manifestEntryPredicate;
   private Map<Integer, PartitionSpec> specsById;
   private Expression dataFilter;
   private Expression fileFilter;
@@ -97,7 +97,7 @@ class ManifestGroup {
     return this;
   }
 
-  ManifestGroup filterManifestEntries(Predicate<ManifestEntry> newManifestEntryPredicate) {
+  ManifestGroup filterManifestEntries(Predicate<ManifestEntry<DataFile>> newManifestEntryPredicate) {
     this.manifestEntryPredicate = manifestEntryPredicate.and(newManifestEntryPredicate);
     return this;
   }
@@ -169,12 +169,12 @@ class ManifestGroup {
    *
    * @return a CloseableIterable of manifest entries.
    */
-  public CloseableIterable<ManifestEntry> entries() {
+  public CloseableIterable<ManifestEntry<DataFile>> entries() {
     return CloseableIterable.concat(entries((manifest, entries) -> entries));
   }
 
   private <T> Iterable<CloseableIterable<T>> entries(
-      BiFunction<ManifestFile, CloseableIterable<ManifestEntry>, CloseableIterable<T>> entryFn) {
+      BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {
     LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ?
         null : Caffeine.newBuilder().build(specId -> {
           PartitionSpec spec = specsById.get(specId);
@@ -215,7 +215,7 @@ class ManifestGroup {
               .caseSensitive(caseSensitive)
               .select(columns);
 
-          CloseableIterable<ManifestEntry> entries = reader.entries();
+          CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
           if (ignoreDeleted) {
             entries = reader.liveEntries();
           }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index ac10f4c..f73390b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -28,33 +28,19 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
 /**
  * Writer for manifest files.
+ *
+ * @param <F> Java class of files written to the manifest, either {@link DataFile} or {@link DeleteFile}.
  */
-public abstract class ManifestWriter implements FileAppender<DataFile> {
+public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAppender<F> {
   // stand-in for the current sequence number that will be assigned when the commit is successful
   // this is replaced when writing a manifest list by the ManifestFile wrapper
   static final long UNASSIGNED_SEQ = -1L;
 
-  /**
-   * Create a new {@link ManifestWriter}.
-   * <p>
-   * Manifests created by this writer have all entry snapshot IDs set to null.
-   * All entries will inherit the snapshot ID that will be assigned to the manifest on commit.
-   *
-   * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
-   * @param outputFile the destination file location
-   * @return a manifest writer
-   * @deprecated will be removed in 0.9.0; use {@link ManifestFiles#write(PartitionSpec, OutputFile)} instead.
-   */
-  @Deprecated
-  public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
-    return ManifestFiles.write(spec, outputFile);
-  }
-
   private final OutputFile file;
   private final int specId;
-  private final FileAppender<ManifestEntry> writer;
+  private final FileAppender<ManifestEntry<F>> writer;
   private final Long snapshotId;
-  private final GenericManifestEntry reused;
+  private final GenericManifestEntry<F> reused;
   private final PartitionSummary stats;
 
   private boolean closed = false;
@@ -71,15 +57,19 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
     this.specId = spec.specId();
     this.writer = newAppender(spec, file);
     this.snapshotId = snapshotId;
-    this.reused = new GenericManifestEntry(spec.partitionType());
+    this.reused = new GenericManifestEntry<>(spec.partitionType());
     this.stats = new PartitionSummary(spec);
   }
 
-  protected abstract ManifestEntry prepare(ManifestEntry entry);
+  protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
 
-  protected abstract FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile outputFile);
+  protected abstract FileAppender<ManifestEntry<F>> newAppender(PartitionSpec spec, OutputFile outputFile);
 
-  void addEntry(ManifestEntry entry) {
+  protected ManifestContent content() {
+    return ManifestContent.DATA;
+  }
+
+  void addEntry(ManifestEntry<F> entry) {
     switch (entry.status()) {
       case ADDED:
         addedFiles += 1;
@@ -102,48 +92,48 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
   }
 
   /**
-   * Add an added entry for a data file.
+   * Add an added entry for a file.
    * <p>
    * The entry's snapshot ID will be this manifest's snapshot ID.
    *
    * @param addedFile a data file
    */
   @Override
-  public void add(DataFile addedFile) {
+  public void add(F addedFile) {
     addEntry(reused.wrapAppend(snapshotId, addedFile));
   }
 
-  void add(ManifestEntry entry) {
+  void add(ManifestEntry<F> entry) {
     addEntry(reused.wrapAppend(snapshotId, entry.file()));
   }
 
   /**
-   * Add an existing entry for a data file.
+   * Add an existing entry for a file.
    *
-   * @param existingFile a data file
+   * @param existingFile a file
    * @param fileSnapshotId snapshot ID when the data file was added to the table
    * @param sequenceNumber sequence number for the data file
    */
-  public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) {
+  public void existing(F existingFile, long fileSnapshotId, long sequenceNumber) {
     addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile));
   }
 
-  void existing(ManifestEntry entry) {
+  void existing(ManifestEntry<F> entry) {
     addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file()));
   }
 
   /**
-   * Add a delete entry for a data file.
+   * Add a delete entry for a file.
    * <p>
    * The entry's snapshot ID will be this manifest's snapshot ID.
    *
-   * @param deletedFile a data file
+   * @param deletedFile a file
    */
-  public void delete(DataFile deletedFile) {
+  public void delete(F deletedFile) {
     addEntry(reused.wrapDelete(snapshotId, deletedFile));
   }
 
-  void delete(ManifestEntry entry) {
+  void delete(ManifestEntry<F> entry) {
     // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
     // when this Snapshot has been removed or when there are no Snapshots older than this one.
     addEntry(reused.wrapDelete(snapshotId, entry.file()));
@@ -164,7 +154,7 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
     // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min
     // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it.
     long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
-    return new GenericManifestFile(file.location(), writer.length(), specId, ManifestContent.DATA,
+    return new GenericManifestFile(file.location(), writer.length(), specId, content(),
         UNASSIGNED_SEQ, minSeqNumber, snapshotId,
         addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
   }
@@ -175,21 +165,54 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
     writer.close();
   }
 
-  static class V2Writer extends ManifestWriter {
-    private V2Metadata.IndexedManifestEntry entryWrapper;
+  static class V2Writer extends ManifestWriter<DataFile> {
+    private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;
 
     V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
       super(spec, file, snapshotId);
-      this.entryWrapper = new V2Metadata.IndexedManifestEntry(snapshotId, spec.partitionType());
+      this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
+    }
+
+    @Override
+    protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
+      return entryWrapper.wrap(entry);
+    }
+
+    @Override
+    protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
+      Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
+      try {
+        return Avro.write(file)
+            .schema(manifestSchema)
+            .named("manifest_entry")
+            .meta("schema", SchemaParser.toJson(spec.schema()))
+            .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+            .meta("partition-spec-id", String.valueOf(spec.specId()))
+            .meta("format-version", "2")
+            .meta("content", "data")
+            .overwrite()
+            .build();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
+      }
+    }
+  }
+
+  static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
+    private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;
+
+    V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+      super(spec, file, snapshotId);
+      this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
     }
 
     @Override
-    protected ManifestEntry prepare(ManifestEntry entry) {
+    protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {
       return entryWrapper.wrap(entry);
     }
 
     @Override
-    protected FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile file) {
+    protected FileAppender<ManifestEntry<DeleteFile>> newAppender(PartitionSpec spec, OutputFile file) {
       Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
       try {
         return Avro.write(file)
@@ -199,16 +222,22 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
             .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
             .meta("partition-spec-id", String.valueOf(spec.specId()))
             .meta("format-version", "2")
+            .meta("content", "deletes")
             .overwrite()
             .build();
       } catch (IOException e) {
         throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
       }
     }
+
+    @Override
+    protected ManifestContent content() {
+      return ManifestContent.DELETES;
+    }
   }
 
-  static class V1Writer extends ManifestWriter {
-    private V1Metadata.IndexedManifestEntry entryWrapper;
+  static class V1Writer extends ManifestWriter<DataFile> {
+    private final V1Metadata.IndexedManifestEntry entryWrapper;
 
     V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
       super(spec, file, snapshotId);
@@ -216,12 +245,12 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
     }
 
     @Override
-    protected ManifestEntry prepare(ManifestEntry entry) {
+    protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
       return entryWrapper.wrap(entry);
     }
 
     @Override
-    protected FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile file) {
+    protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
       Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType());
       try {
         return Avro.write(file)
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 0ba09db..f8fbe90 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -526,7 +526,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     Evaluator inclusive = extractInclusiveDeleteExpression(reader);
     Evaluator strict = extractStrictDeleteExpression(reader);
     boolean hasDeletedFiles = false;
-    for (ManifestEntry entry : reader.entries()) {
+    for (ManifestEntry<DataFile> entry : reader.entries()) {
       DataFile file = entry.file();
       boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
           dropPartitions.contains(partitionWrapper.set(file.partition()));
@@ -555,7 +555,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     // manifest. produce a copy of the manifest with all deleted files removed.
     List<DataFile> deletedFiles = Lists.newArrayList();
     Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
-    ManifestWriter writer = newManifestWriter(reader.spec());
+    ManifestWriter<DataFile> writer = newManifestWriter(reader.spec());
     try {
       reader.entries().forEach(entry -> {
         DataFile file = entry.file();
@@ -667,11 +667,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
       return mergeManifests.get(bin);
     }
 
-    ManifestWriter writer = newManifestWriter(ops.current().spec());
+    ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec());
     try {
       for (ManifestFile manifest : bin) {
         try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-          for (ManifestEntry entry : reader.entries()) {
+          for (ManifestEntry<DataFile> entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
               // should be added to the new manifest
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 2464580..f927728 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -350,7 +350,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         .run(manifest -> {
           // the manifest has deletes, scan it to find files to delete
           try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-            for (ManifestEntry entry : reader.entries()) {
+            for (ManifestEntry<?> entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
                   !validIds.contains(entry.snapshotId())) {
@@ -370,7 +370,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         .run(manifest -> {
           // the manifest has deletes, scan it to find files to delete
           try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
-            for (ManifestEntry entry : reader.entries()) {
+            for (ManifestEntry<?> entry : reader.entries()) {
               // delete any ADDED file from manifests that were reverted
               if (entry.status() == ManifestEntry.Status.ADDED) {
                 // use toString to ensure the path will not change (Utf8 is reused)
diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java
index 126ada8..f93f785 100644
--- a/core/src/main/java/org/apache/iceberg/ScanSummary.java
+++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java
@@ -218,7 +218,7 @@ public class ScanSummary {
       TopN<String, PartitionMetrics> topN = new TopN<>(
           limit, throwIfLimited, Comparators.charSequences());
 
-      try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests)
+      try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(ops.io(), manifests)
           .specsById(ops.current().specsById())
           .filterData(rowFilter)
           .ignoreDeleted()
@@ -226,7 +226,7 @@ public class ScanSummary {
           .entries()) {
 
         PartitionSpec spec = table.spec();
-        for (ManifestEntry entry : entries) {
+        for (ManifestEntry<?> entry : entries) {
           Long timestamp = snapshotTimestamps.get(entry.snapshotId());
 
           // if filtering, skip timestamps that are outside the range
@@ -280,7 +280,7 @@ public class ScanSummary {
       return this;
     }
 
-    private PartitionMetrics updateFromFile(DataFile file, Long timestampMillis) {
+    private PartitionMetrics updateFromFile(ContentFile<?> file, Long timestampMillis) {
       this.fileCount += 1;
       this.recordCount += file.recordCount();
       this.totalSize += file.fileSizeInBytes();
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 012f7fe..e4d8adc 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -331,7 +331,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
         ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
   }
 
-  protected ManifestWriter newManifestWriter(PartitionSpec spec) {
+  protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
     return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
   }
 
@@ -358,7 +358,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
 
       Long snapshotId = null;
       long maxSnapshotId = Long.MIN_VALUE;
-      for (ManifestEntry entry : reader.entries()) {
+      for (ManifestEntry<DataFile> entry : reader.entries()) {
         if (entry.snapshotId() > maxSnapshotId) {
           maxSnapshotId = entry.snapshotId();
         }
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index 3f38cd3..5615ee1 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -220,17 +220,17 @@ class V1Metadata {
   /**
    * Wrapper used to write a ManifestEntry to v1 metadata.
    */
-  static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
+  static class IndexedManifestEntry implements ManifestEntry<DataFile>, IndexedRecord {
     private final org.apache.avro.Schema avroSchema;
     private final IndexedDataFile fileWrapper;
-    private ManifestEntry wrapped = null;
+    private ManifestEntry<DataFile> wrapped = null;
 
     IndexedManifestEntry(Types.StructType partitionType) {
       this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
       this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema());
     }
 
-    public IndexedManifestEntry wrap(ManifestEntry entry) {
+    public IndexedManifestEntry wrap(ManifestEntry<DataFile> entry) {
       this.wrapped = entry;
       return this;
     }
@@ -294,12 +294,12 @@ class V1Metadata {
     }
 
     @Override
-    public ManifestEntry copy() {
+    public ManifestEntry<DataFile> copy() {
       return wrapped.copy();
     }
 
     @Override
-    public ManifestEntry copyWithoutStats() {
+    public ManifestEntry<DataFile> copyWithoutStats() {
       return wrapped.copyWithoutStats();
     }
   }
@@ -365,6 +365,11 @@ class V1Metadata {
     }
 
     @Override
+    public FileContent content() {
+      return wrapped.content();
+    }
+
+    @Override
     public CharSequence path() {
       return wrapped.path();
     }
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index ee57f24..c3d3d5e 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -257,19 +257,19 @@ class V2Metadata {
     );
   }
 
-  static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
+  static class IndexedManifestEntry<F extends ContentFile<F>> implements ManifestEntry<F>, IndexedRecord {
     private final org.apache.avro.Schema avroSchema;
     private final Long commitSnapshotId;
-    private final IndexedDataFile fileWrapper;
-    private ManifestEntry wrapped = null;
+    private final IndexedDataFile<?> fileWrapper;
+    private ManifestEntry<F> wrapped = null;
 
     IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
       this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
       this.commitSnapshotId = commitSnapshotId;
-      this.fileWrapper = new IndexedDataFile(partitionType);
+      this.fileWrapper = new IndexedDataFile<>(partitionType);
     }
 
-    public IndexedManifestEntry wrap(ManifestEntry entry) {
+    public IndexedManifestEntry<F> wrap(ManifestEntry<F> entry) {
       this.wrapped = entry;
       return this;
     }
@@ -335,36 +335,37 @@ class V2Metadata {
     }
 
     @Override
-    public DataFile file() {
+    public F file() {
       return wrapped.file();
     }
 
     @Override
-    public ManifestEntry copy() {
+    public ManifestEntry<F> copy() {
       return wrapped.copy();
     }
 
     @Override
-    public ManifestEntry copyWithoutStats() {
+    public ManifestEntry<F> copyWithoutStats() {
       return wrapped.copyWithoutStats();
     }
   }
 
   /**
-   * Wrapper used to write a DataFile to v2 metadata.
+   * Wrapper used to write DataFile or DeleteFile to v2 metadata.
    */
-  static class IndexedDataFile implements DataFile, IndexedRecord {
+  static class IndexedDataFile<F> implements ContentFile<F>, IndexedRecord {
     private final org.apache.avro.Schema avroSchema;
     private final IndexedStructLike partitionWrapper;
-    private DataFile wrapped = null;
+    private ContentFile<F> wrapped = null;
 
     IndexedDataFile(Types.StructType partitionType) {
       this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
       this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
     }
 
-    IndexedDataFile wrap(DataFile file) {
-      this.wrapped = file;
+    @SuppressWarnings("unchecked")
+    IndexedDataFile<F> wrap(ContentFile<?> file) {
+      this.wrapped = (ContentFile<F>) file;
       return this;
     }
 
@@ -377,7 +378,7 @@ class V2Metadata {
     public Object get(int pos) {
       switch (pos) {
         case 0:
-          return FileContent.DATA.id();
+          return wrapped.content().id();
         case 1:
           return wrapped.path().toString();
         case 2:
@@ -412,6 +413,11 @@ class V2Metadata {
     }
 
     @Override
+    public FileContent content() {
+      return wrapped.content();
+    }
+
+    @Override
     public CharSequence path() {
       return wrapped.path();
     }
@@ -472,13 +478,13 @@ class V2Metadata {
     }
 
     @Override
-    public DataFile copy() {
-      return wrapped.copy();
+    public F copy() {
+      throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
     }
 
     @Override
-    public DataFile copyWithoutStats() {
-      return wrapped.copyWithoutStats();
+    public F copyWithoutStats() {
+      throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
     }
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index e2c7378..8de5e4b 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -147,7 +147,7 @@ public class TableTestBase {
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
     try {
       for (DataFile file : files) {
         writer.add(file);
@@ -159,22 +159,22 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
-  ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException {
+  ManifestFile writeManifest(String fileName, ManifestEntry<DataFile>... entries) throws IOException {
     return writeManifest(null, fileName, entries);
   }
 
-  ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException {
+  ManifestFile writeManifest(Long snapshotId, ManifestEntry<DataFile>... entries) throws IOException {
     return writeManifest(snapshotId, "input.m0.avro", entries);
   }
 
-  ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry... entries) throws IOException {
+  ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<DataFile>... entries) throws IOException {
     File manifestFile = temp.newFile(fileName);
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
     try {
-      for (ManifestEntry entry : entries) {
+      for (ManifestEntry<DataFile> entry : entries) {
         writer.addEntry(entry);
       }
     } finally {
@@ -189,7 +189,7 @@ public class TableTestBase {
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
     try {
       for (DataFile file : files) {
         writer.add(file);
@@ -201,8 +201,8 @@ public class TableTestBase {
     return writer.toManifestFile();
   }
 
-  ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) {
-    GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType());
+  ManifestEntry<DataFile> manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) {
+    GenericManifestEntry<DataFile> entry = new GenericManifestEntry<>(table.spec().partitionType());
     switch (status) {
       case ADDED:
         return entry.wrapAppend(snapshotId, file);
@@ -240,7 +240,7 @@ public class TableTestBase {
     long id = snap.snapshotId();
     Iterator<String> newPaths = paths(newFiles).iterator();
 
-    for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+    for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
       DataFile file = entry.file();
       if (sequenceNumber != null) {
         V1Assert.assertEquals("Sequence number should default to 0", 0, entry.sequenceNumber().longValue());
@@ -283,7 +283,7 @@ public class TableTestBase {
                         Iterator<Long> seqs,
                         Iterator<Long> ids,
                         Iterator<DataFile> expectedFiles) {
-    for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+    for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
       DataFile file = entry.file();
       DataFile expected = expectedFiles.next();
       if (seqs != null) {
@@ -303,7 +303,7 @@ public class TableTestBase {
                                       Iterator<Long> ids,
                                       Iterator<DataFile> expectedFiles,
                                       Iterator<ManifestEntry.Status> expectedStatuses) {
-    for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+    for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
       DataFile file = entry.file();
       DataFile expected = expectedFiles.next();
       final ManifestEntry.Status expectedStatus = expectedStatuses.next();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index b670fd4..e899fb7 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -47,7 +47,7 @@ public class TestManifestReader extends TableTestBase {
   public void testManifestReaderWithEmptyInheritableMetadata() throws IOException {
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A));
     try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
-      ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+      ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(Status.EXISTING, entry.status());
       Assert.assertEquals(FILE_A.path(), entry.file().path());
       Assert.assertEquals(1000L, (long) entry.snapshotId());
@@ -67,7 +67,7 @@ public class TestManifestReader extends TableTestBase {
   public void testManifestReaderWithPartitionMetadata() throws IOException {
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
     try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
-      ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+      ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(123L, (long) entry.snapshotId());
 
       List<Types.NestedField> fields = ((PartitionData) entry.file().partition()).getPartitionType().fields();
@@ -88,7 +88,7 @@ public class TestManifestReader extends TableTestBase {
 
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
     try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
-      ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+      ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(123L, (long) entry.snapshotId());
 
       List<Types.NestedField> fields = ((PartitionData) entry.file().partition()).getPartitionType().fields();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 6177797..32e2d9a 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -70,6 +70,9 @@ public class TestManifestWriterVersions {
   private static final DataFile DATA_FILE = new GenericDataFile(
       PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);
 
+  private static final DeleteFile DELETE_FILE = new GenericDeleteFile(
+      FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, null);
+
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
@@ -77,7 +80,14 @@ public class TestManifestWriterVersions {
   public void testV1Write() throws IOException {
     ManifestFile manifest = writeManifest(1);
     checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
-    checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ);
+    checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
+  }
+
+  @Test
+  public void testV1WriteDelete() {
+    AssertHelpers.assertThrows("Should fail to write a delete manifest for v1",
+        IllegalArgumentException.class, "Cannot write delete files in a v1 table",
+        () -> writeDeleteManifest(1));
   }
 
   @Test
@@ -86,23 +96,43 @@ public class TestManifestWriterVersions {
     checkManifest(manifest, 0L);
 
     // v1 should be read using sequence number 0 because it was missing from the manifest list file
-    checkEntry(readManifest(manifest), 0L);
+    checkEntry(readManifest(manifest), 0L, FileContent.DATA);
   }
 
   @Test
   public void testV2Write() throws IOException {
     ManifestFile manifest = writeManifest(1);
     checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
-    checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ);
+    Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
+    checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
   }
 
   @Test
   public void testV2WriteWithInheritance() throws IOException {
     ManifestFile manifest = writeAndReadManifestList(writeManifest(2), 2);
     checkManifest(manifest, SEQUENCE_NUMBER);
+    Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
+
+    // v2 should use the correct sequence number by inheriting it
+    checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA);
+  }
+
+  @Test
+  public void testV2WriteDelete() throws IOException {
+    ManifestFile manifest = writeDeleteManifest(2);
+    checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+    Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());
+    checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES);
+  }
+
+  @Test
+  public void testV2WriteDeleteWithInheritance() throws IOException {
+    ManifestFile manifest = writeAndReadManifestList(writeDeleteManifest(2), 2);
+    checkManifest(manifest, SEQUENCE_NUMBER);
+    Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());
 
     // v2 should use the correct sequence number by inheriting it
-    checkEntry(readManifest(manifest), SEQUENCE_NUMBER);
+    checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES);
   }
 
   @Test
@@ -117,7 +147,7 @@ public class TestManifestWriterVersions {
     checkManifest(manifest2, 0L);
 
     // should not inherit the v2 sequence number because it was a rewrite
-    checkEntry(readManifest(manifest2), 0L);
+    checkEntry(readManifest(manifest2), 0L, FileContent.DATA);
   }
 
   @Test
@@ -136,24 +166,26 @@ public class TestManifestWriterVersions {
     checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L);
 
     // should not inherit the v2 sequence number because it was written into the v2 manifest
-    checkRewrittenEntry(readManifest(manifest2), 0L);
+    checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA);
   }
 
-  void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) {
+  void checkEntry(ManifestEntry<?> entry, Long expectedSequenceNumber, FileContent content) {
     Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status());
     Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
     Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
-    checkDataFile(entry.file());
+    checkDataFile(entry.file(), content);
   }
 
-  void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber) {
+  void checkRewrittenEntry(ManifestEntry<DataFile> entry, Long expectedSequenceNumber, FileContent content) {
     Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status());
     Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
     Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
-    checkDataFile(entry.file());
+    checkDataFile(entry.file(), content);
   }
 
-  void checkDataFile(DataFile dataFile) {
+  void checkDataFile(ContentFile<?> dataFile, FileContent content) {
+    // DataFile is the superclass of DeleteFile, so this method can check both
+    Assert.assertEquals("Content", content, dataFile.content());
     Assert.assertEquals("Path", PATH, dataFile.path());
     Assert.assertEquals("Format", FORMAT, dataFile.format());
     Assert.assertEquals("Partition", PARTITION, dataFile.partition());
@@ -206,7 +238,7 @@ public class TestManifestWriterVersions {
 
   private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) throws IOException {
     OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
-    ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
     try {
       writer.existing(readManifest(manifest));
     } finally {
@@ -216,21 +248,46 @@ public class TestManifestWriterVersions {
   }
 
   private ManifestFile writeManifest(int formatVersion) throws IOException {
+    return writeManifest(DATA_FILE, formatVersion);
+  }
+
+  private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException {
     OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
-    ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
     try {
-      writer.add(DATA_FILE);
+      writer.add(file);
     } finally {
       writer.close();
     }
     return writer.toManifestFile();
   }
 
-  private ManifestEntry readManifest(ManifestFile manifest) throws IOException {
-    try (CloseableIterable<ManifestEntry> reader = ManifestFiles.read(manifest, FILE_IO).entries()) {
-      List<ManifestEntry> files = Lists.newArrayList(reader);
+  private ManifestEntry<DataFile> readManifest(ManifestFile manifest) throws IOException {
+    try (CloseableIterable<ManifestEntry<DataFile>> reader = ManifestFiles.read(manifest, FILE_IO).entries()) {
+      List<ManifestEntry<DataFile>> files = Lists.newArrayList(reader);
       Assert.assertEquals("Should contain only one data file", 1, files.size());
       return files.get(0);
     }
   }
+
+  private ManifestFile writeDeleteManifest(int formatVersion) throws IOException {
+    OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+    ManifestWriter<DeleteFile> writer = ManifestFiles.writeDeleteManifest(
+        formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+    try {
+      writer.add(DELETE_FILE);
+    } finally {
+      writer.close();
+    }
+    return writer.toManifestFile();
+  }
+
+  private ManifestEntry<DeleteFile> readDeleteManifest(ManifestFile manifest) throws IOException {
+    try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+             ManifestFiles.readDeleteManifest(manifest, FILE_IO, null).entries()) {
+      List<ManifestEntry<DeleteFile>> entries = Lists.newArrayList(reader);
+      Assert.assertEquals("Should contain only one data file", 1, entries.size());
+      return entries.get(0);
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 47f0d90..a393adf 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -745,7 +745,7 @@ public class TestMergeAppend extends TableTestBase {
         initialManifest, pending.manifests().get(1));
 
     // field ids of manifest entries in two manifests with different specs of the same source field should be different
-    ManifestEntry entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next();
+    ManifestEntry<DataFile> entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next();
     Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0);
     Assert.assertEquals(1000, field.fieldId());
     Assert.assertEquals("id_bucket", field.name());
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index f6d0134..cb64793 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -955,7 +955,7 @@ public class TestRewriteManifests extends TableTestBase {
 
     Assert.assertEquals(3, Iterables.size(table.snapshots()));
 
-    ManifestEntry entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A);
+    ManifestEntry<DataFile> entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A);
     // update the entry's sequence number or else it will be rejected by the writer
     entry.setSequenceNumber(firstSnapshot.sequenceNumber());
     ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry);
@@ -1005,7 +1005,7 @@ public class TestRewriteManifests extends TableTestBase {
     Assert.assertEquals(1, manifests.size());
     ManifestFile manifest = manifests.get(0);
 
-    ManifestEntry appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A);
+    ManifestEntry<DataFile> appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A);
     // update the entry's sequence number or else it will be rejected by the writer
     appendEntry.setSequenceNumber(snapshot.sequenceNumber());
 
@@ -1018,7 +1018,7 @@ public class TestRewriteManifests extends TableTestBase {
             .addManifest(invalidAddedFileManifest)
             .commit());
 
-    ManifestEntry deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A);
+    ManifestEntry<DataFile> deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A);
     // update the entry's sequence number or else it will be rejected by the writer
     deleteEntry.setSequenceNumber(snapshot.sequenceNumber());