You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/05/23 19:10:52 UTC

[incubator-iceberg] branch master updated: Add content types to DataFile and ManifestFile (#1030)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d1462  Add content types to DataFile and ManifestFile (#1030)
01d1462 is described below

commit 01d1462756db20a14a9ac67166e5bf56966861b4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Sat May 23 12:10:42 2020 -0700

    Add content types to DataFile and ManifestFile (#1030)
---
 api/src/main/java/org/apache/iceberg/DataFile.java |  80 +++++---
 .../main/java/org/apache/iceberg/FileContent.java  |  39 ++++
 .../java/org/apache/iceberg/ManifestContent.java   |  38 ++++
 .../main/java/org/apache/iceberg/ManifestFile.java |  53 +++--
 .../main/java/org/apache/iceberg/types/Types.java  |  14 ++
 .../test/java/org/apache/iceberg/TestHelpers.java  |  10 +-
 .../java/org/apache/iceberg/AllEntriesTable.java   |   6 +-
 .../java/org/apache/iceberg/GenericDataFile.java   |  38 ++--
 .../org/apache/iceberg/GenericManifestFile.java    |  53 +++--
 .../org/apache/iceberg/ManifestListWriter.java     |   3 +
 .../java/org/apache/iceberg/ManifestWriter.java    |   3 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |   4 +-
 .../main/java/org/apache/iceberg/V1Metadata.java   |  35 +++-
 .../main/java/org/apache/iceberg/V2Metadata.java   | 225 +++++++++++++++++----
 .../apache/iceberg/TestManifestListVersions.java   |  34 +++-
 .../spark/source/TestIcebergSourceTablesBase.java  |  60 +++---
 16 files changed, 535 insertions(+), 160 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index 90758a3..d9a4441 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.BinaryType;
 import org.apache.iceberg.types.Types.IntegerType;
 import org.apache.iceberg.types.Types.ListType;
@@ -37,32 +38,59 @@ import static org.apache.iceberg.types.Types.NestedField.required;
  * Interface for files listed in a table manifest.
  */
 public interface DataFile {
+  // fields for adding delete data files
+  Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
+      "Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
+  Types.NestedField FILE_PATH = required(100, "file_path", StringType.get(), "Location URI with FS scheme");
+  Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get(),
+      "File format name: avro, orc, or parquet");
+  Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get(), "Number of records in the file");
+  Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get(), "Total file size in bytes");
+  Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118,
+      IntegerType.get(), LongType.get()), "Map of column id to total size on disk");
+  Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120,
+      IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN");
+  Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122,
+      IntegerType.get(), LongType.get()), "Map of column id to null value count");
+  Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127,
+      IntegerType.get(), BinaryType.get()), "Map of column id to lower bound");
+  Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130,
+      IntegerType.get(), BinaryType.get()), "Map of column id to upper bound");
+  Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get(), "Encryption key metadata blob");
+  Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()),
+      "Splittable offsets");
+  int PARTITION_ID = 102;
+  String PARTITION_NAME = "partition";
+  String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
+  // NEXT ID TO ASSIGN: 135
+
   static StructType getType(StructType partitionType) {
     // IDs start at 100 to leave room for changes to ManifestEntry
     return StructType.of(
-        required(100, "file_path", StringType.get()),
-        required(101, "file_format", StringType.get()),
-        required(102, "partition", partitionType),
-        required(103, "record_count", LongType.get()),
-        required(104, "file_size_in_bytes", LongType.get()),
-        required(105, "block_size_in_bytes", LongType.get()),
-        optional(108, "column_sizes", MapType.ofRequired(117, 118,
-            IntegerType.get(), LongType.get())),
-        optional(109, "value_counts", MapType.ofRequired(119, 120,
-            IntegerType.get(), LongType.get())),
-        optional(110, "null_value_counts", MapType.ofRequired(121, 122,
-            IntegerType.get(), LongType.get())),
-        optional(125, "lower_bounds", MapType.ofRequired(126, 127,
-            IntegerType.get(), BinaryType.get())),
-        optional(128, "upper_bounds", MapType.ofRequired(129, 130,
-            IntegerType.get(), BinaryType.get())),
-        optional(131, "key_metadata", BinaryType.get()),
-        optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()))
-        // NEXT ID TO ASSIGN: 134
+        CONTENT,
+        FILE_PATH,
+        FILE_FORMAT,
+        required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
+        RECORD_COUNT,
+        FILE_SIZE,
+        COLUMN_SIZES,
+        VALUE_COUNTS,
+        NULL_VALUE_COUNTS,
+        LOWER_BOUNDS,
+        UPPER_BOUNDS,
+        KEY_METADATA,
+        SPLIT_OFFSETS
     );
   }
 
   /**
+   * @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
+   */
+  default FileContent content() {
+    return FileContent.DATA;
+  }
+
+  /**
    * @return fully qualified path to the file, suitable for constructing a Hadoop Path
    */
   CharSequence path();
@@ -119,6 +147,13 @@ public interface DataFile {
   ByteBuffer keyMetadata();
 
   /**
+   * @return List of recommended split locations, if applicable, null otherwise.
+   * When available, this information is used for planning scan tasks whose boundaries
+   * are determined by these offsets. The returned list must be sorted in ascending order.
+   */
+  List<Long> splitOffsets();
+
+  /**
    * Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use
    * this method to copy data when collecting files from tasks.
    *
@@ -133,11 +168,4 @@ public interface DataFile {
    * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
    */
   DataFile copyWithoutStats();
-
-  /**
-   * @return List of recommended split locations, if applicable, null otherwise.
-   * When available, this information is used for planning scan tasks whose boundaries
-   * are determined by these offsets. The returned list must be sorted in ascending order.
-   */
-  List<Long> splitOffsets();
 }
diff --git a/api/src/main/java/org/apache/iceberg/FileContent.java b/api/src/main/java/org/apache/iceberg/FileContent.java
new file mode 100644
index 0000000..67bfca7
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/FileContent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Content type stored in a file, one of DATA, POSITION_DELETES, or EQUALITY_DELETES.
+ */
+public enum FileContent {
+  DATA(0),
+  POSITION_DELETES(1),
+  EQUALITY_DELETES(2);
+
+  private final int id;
+
+  FileContent(int id) {
+    this.id = id;
+  }
+
+  public int id() {
+    return id;
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/ManifestContent.java b/api/src/main/java/org/apache/iceberg/ManifestContent.java
new file mode 100644
index 0000000..b4b4473
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/ManifestContent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Content type stored in a manifest file, either DATA or DELETES.
+ */
+public enum ManifestContent {
+  DATA(0),
+  DELETES(1);
+
+  private final int id;
+
+  ManifestContent(int id) {
+    this.id = id;
+  }
+
+  public int id() {
+    return id;
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 7c4739a..713c061 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -30,29 +30,41 @@ import static org.apache.iceberg.types.Types.NestedField.required;
  * Represents a manifest file that can be scanned to find data files in a table.
  */
 public interface ManifestFile {
-  Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get());
-  Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get());
-  Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get());
-  Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get());
-  Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get());
-  Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get());
-  Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get());
+  Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get(), "Location URI with FS scheme");
+  Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get(), "Total file size in bytes");
+  Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get(), "Spec ID used to write");
+  Types.NestedField MANIFEST_CONTENT = optional(517, "content", Types.IntegerType.get(),
+      "Contents of the manifest: 0=data, 1=deletes");
+  Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get(),
+      "Sequence number when the manifest was added");
+  Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get(),
+      "Lowest sequence number in the manifest");
+  Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get(),
+      "Snapshot ID that added the manifest");
+  Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get(),
+      "Added entry count");
+  Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get(),
+      "Existing entry count");
+  Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get(),
+      "Deleted entry count");
+  Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get(),
+      "Added rows count");
+  Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get(),
+      "Existing rows count");
+  Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get(),
+      "Deleted rows count");
   Types.StructType PARTITION_SUMMARY_TYPE = Types.StructType.of(
-      required(509, "contains_null", Types.BooleanType.get()),
-      optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
-      optional(511, "upper_bound", Types.BinaryType.get())
+      required(509, "contains_null", Types.BooleanType.get(), "True if any file has a null partition value"),
+      optional(510, "lower_bound", Types.BinaryType.get(), "Partition lower bound for all files"),
+      optional(511, "upper_bound", Types.BinaryType.get(), "Partition upper bound for all files")
   );
   Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
-      Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE));
-  Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get());
-  Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get());
-  Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get());
-  Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get());
-  Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get());
-  // next ID to assign: 517
+      Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
+      "Summary for each partition");
+  // next ID to assign: 518
 
   Schema SCHEMA = new Schema(
-      PATH, LENGTH, SPEC_ID,
+      PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
       SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
       ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
       ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
@@ -78,6 +90,11 @@ public interface ManifestFile {
   int partitionSpecId();
 
   /**
+   * @return the content stored in the manifest; either DATA or DELETES
+   */
+  ManifestContent content();
+
+  /**
    * @return the sequence number of the commit that added the manifest file
    */
   long sequenceNumber();
diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java
index 48e7c96..df398d8 100644
--- a/api/src/main/java/org/apache/iceberg/types/Types.java
+++ b/api/src/main/java/org/apache/iceberg/types/Types.java
@@ -457,10 +457,24 @@ public class Types {
       return isOptional;
     }
 
+    public NestedField asOptional() {
+      if (isOptional) {
+        return this;
+      }
+      return new NestedField(true, id, name, type, doc);
+    }
+
     public boolean isRequired() {
       return !isOptional;
     }
 
+    public NestedField asRequired() {
+      if (!isOptional) {
+        return this;
+      }
+      return new NestedField(false, id, name, type, doc);
+    }
+
     public int fieldId() {
       return id;
     }
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index 4989281..b4881b3 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -163,6 +163,7 @@ public class TestHelpers {
     private final String path;
     private final long length;
     private final int specId;
+    private final ManifestContent content;
     private final Long snapshotId;
     private final Integer addedFiles;
     private final Long addedRows;
@@ -178,6 +179,7 @@ public class TestHelpers {
       this.path = path;
       this.length = length;
       this.specId = specId;
+      this.content = ManifestContent.DATA;
       this.snapshotId = snapshotId;
       this.addedFiles = addedFiles;
       this.addedRows = null;
@@ -188,13 +190,14 @@ public class TestHelpers {
       this.partitions = partitions;
     }
 
-    public TestManifestFile(String path, long length, int specId, Long snapshotId,
+    public TestManifestFile(String path, long length, int specId, ManifestContent content, Long snapshotId,
                             Integer addedFiles, Long addedRows, Integer existingFiles,
                             Long existingRows, Integer deletedFiles, Long deletedRows,
                             List<PartitionFieldSummary> partitions) {
       this.path = path;
       this.length = length;
       this.specId = specId;
+      this.content = content;
       this.snapshotId = snapshotId;
       this.addedFiles = addedFiles;
       this.addedRows = addedRows;
@@ -221,6 +224,11 @@ public class TestHelpers {
     }
 
     @Override
+    public ManifestContent content() {
+      return content;
+    }
+
+    @Override
     public long sequenceNumber() {
       return 0;
     }
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 9c6aa85..d48ec76 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -100,11 +100,13 @@ public class AllEntriesTable extends BaseMetadataTable {
     protected CloseableIterable<FileScanTask> planFiles(
         TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = AllDataFilesTable.allManifestFiles(ops.current().snapshots());
+      Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
 
-      return CloseableIterable.transform(manifests, manifest -> new BaseFileScanTask(
-          DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+      return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
+          ops.io(), manifest, fileSchema, schemaString, specString, residuals));
     }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index aa427cb..35065b1 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -67,8 +68,6 @@ class GenericDataFile
   // cached schema
   private transient org.apache.avro.Schema avroSchema = null;
 
-  private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
-
   /**
    * Used by Avro reflection to instantiate this class when reading manifest files.
    */
@@ -197,6 +196,11 @@ class GenericDataFile
   }
 
   @Override
+  public FileContent content() {
+    return FileContent.DATA;
+  }
+
+  @Override
   public CharSequence path() {
     return filePath;
   }
@@ -274,22 +278,24 @@ class GenericDataFile
     }
     switch (pos) {
       case 0:
-        // always coerce to String for Serializable
-        this.filePath = v.toString();
+        Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
+            "Invalid content for a DataFile: %s", v);
         return;
       case 1:
-        this.format = FileFormat.valueOf(v.toString());
+        // always coerce to String for Serializable
+        this.filePath = v.toString();
         return;
       case 2:
-        this.partitionData = (PartitionData) v;
+        this.format = FileFormat.valueOf(v.toString());
         return;
       case 3:
-        this.recordCount = (Long) v;
+        this.partitionData = (PartitionData) v;
         return;
       case 4:
-        this.fileSizeInBytes = (Long) v;
+        this.recordCount = (Long) v;
         return;
       case 5:
+        this.fileSizeInBytes = (Long) v;
         return;
       case 6:
         this.columnSizes = (Map<Integer, Long>) v;
@@ -331,19 +337,17 @@ class GenericDataFile
     }
     switch (pos) {
       case 0:
-        return filePath;
+        return FileContent.DATA.id();
       case 1:
-        return format != null ? format.toString() : null;
+        return filePath;
       case 2:
-        return partitionData;
+        return format != null ? format.toString() : null;
       case 3:
-        return recordCount;
+        return partitionData;
       case 4:
-        return fileSizeInBytes;
+        return recordCount;
       case 5:
-        // block_size_in_bytes is not used. However, it is a required avro field in DataFile. So
-        // to maintain compatibility, we need to return something.
-        return DEFAULT_BLOCK_SIZE;
+        return fileSizeInBytes;
       case 6:
         return columnSizes;
       case 7:
@@ -377,7 +381,7 @@ class GenericDataFile
 
   @Override
   public int size() {
-    return 13;
+    return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index 26f7153..66bab61 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -46,6 +46,7 @@ public class GenericManifestFile
   private String manifestPath = null;
   private Long length = null;
   private int specId = -1;
+  private ManifestContent content = ManifestContent.DATA;
   private long sequenceNumber = 0;
   private long minSequenceNumber = 0;
   private Long snapshotId = null;
@@ -101,7 +102,7 @@ public class GenericManifestFile
     this.fromProjectionPos = null;
   }
 
-  public GenericManifestFile(String path, long length, int specId,
+  public GenericManifestFile(String path, long length, int specId, ManifestContent content,
                              long sequenceNumber, long minSequenceNumber, Long snapshotId,
                              int addedFilesCount, long addedRowsCount, int existingFilesCount,
                              long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
@@ -110,6 +111,7 @@ public class GenericManifestFile
     this.manifestPath = path;
     this.length = length;
     this.specId = specId;
+    this.content = content;
     this.sequenceNumber = sequenceNumber;
     this.minSequenceNumber = minSequenceNumber;
     this.snapshotId = snapshotId;
@@ -133,6 +135,7 @@ public class GenericManifestFile
     this.manifestPath = toCopy.manifestPath;
     this.length = toCopy.length;
     this.specId = toCopy.specId;
+    this.content = toCopy.content;
     this.sequenceNumber = toCopy.sequenceNumber;
     this.minSequenceNumber = toCopy.minSequenceNumber;
     this.snapshotId = toCopy.snapshotId;
@@ -181,6 +184,11 @@ public class GenericManifestFile
   }
 
   @Override
+  public ManifestContent content() {
+    return content;
+  }
+
+  @Override
   public long sequenceNumber() {
     return sequenceNumber;
   }
@@ -255,24 +263,26 @@ public class GenericManifestFile
       case 2:
         return specId;
       case 3:
-        return sequenceNumber;
+        return content.id();
       case 4:
-        return minSequenceNumber;
+        return sequenceNumber;
       case 5:
-        return snapshotId;
+        return minSequenceNumber;
       case 6:
-        return addedFilesCount;
+        return snapshotId;
       case 7:
-        return existingFilesCount;
+        return addedFilesCount;
       case 8:
-        return deletedFilesCount;
+        return existingFilesCount;
       case 9:
-        return addedRowsCount;
+        return deletedFilesCount;
       case 10:
-        return existingRowsCount;
+        return addedRowsCount;
       case 11:
-        return deletedRowsCount;
+        return existingRowsCount;
       case 12:
+        return deletedRowsCount;
+      case 13:
         return partitions;
       default:
         throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -299,33 +309,36 @@ public class GenericManifestFile
         this.specId = (Integer) value;
         return;
       case 3:
-        this.sequenceNumber = value != null ? (Long) value : 0;
+        this.content = value != null ? ManifestContent.values()[(Integer) value] : ManifestContent.DATA;
         return;
       case 4:
-        this.minSequenceNumber = value != null ? (Long) value : 0;
+        this.sequenceNumber = value != null ? (Long) value : 0;
         return;
       case 5:
-        this.snapshotId = (Long) value;
+        this.minSequenceNumber = value != null ? (Long) value : 0;
         return;
       case 6:
-        this.addedFilesCount = (Integer) value;
+        this.snapshotId = (Long) value;
         return;
       case 7:
-        this.existingFilesCount = (Integer) value;
+        this.addedFilesCount = (Integer) value;
         return;
       case 8:
-        this.deletedFilesCount = (Integer) value;
+        this.existingFilesCount = (Integer) value;
         return;
       case 9:
-        this.addedRowsCount = (Long) value;
+        this.deletedFilesCount = (Integer) value;
         return;
       case 10:
-        this.existingRowsCount = (Long) value;
+        this.addedRowsCount = (Long) value;
         return;
       case 11:
-        this.deletedRowsCount = (Long) value;
+        this.existingRowsCount = (Long) value;
         return;
       case 12:
+        this.deletedRowsCount = (Long) value;
+        return;
+      case 13:
         this.partitions = (List<PartitionFieldSummary>) value;
         return;
       default:
@@ -393,7 +406,7 @@ public class GenericManifestFile
         this.manifestFile = new GenericManifestFile((GenericManifestFile) toCopy);
       } else {
         this.manifestFile = new GenericManifestFile(
-            toCopy.path(), toCopy.length(), toCopy.partitionSpecId(),
+            toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.content(),
             toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(),
             toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(),
             toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(),
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 1cf9c92..56d127e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Iterator;
@@ -114,6 +115,8 @@ abstract class ManifestListWriter implements FileAppender<ManifestFile> {
 
     @Override
     protected ManifestFile prepare(ManifestFile manifest) {
+      Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
+          "Cannot store delete manifests in a v1 table");
       return wrapper.wrap(manifest);
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index ddd74c8..22e1011 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -164,7 +164,8 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
     // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min
     // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it.
     long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
-    return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSeqNumber, snapshotId,
+    return new GenericManifestFile(file.location(), writer.length(), specId, ManifestContent.DATA,
+        UNASSIGNED_SEQ, minSeqNumber, snapshotId,
         addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 4acc5f6..17f914b 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -393,8 +393,8 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
       }
 
       return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
-          manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, addedFiles, addedRows, existingFiles,
-          existingRows, deletedFiles, deletedRows, stats.summaries());
+          ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId,
+          addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
 
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index a905b4d..3f38cd3 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -111,6 +111,11 @@ class V1Metadata {
     }
 
     @Override
+    public ManifestContent content() {
+      return wrapped.content();
+    }
+
+    @Override
     public long sequenceNumber() {
       return wrapped.sequenceNumber();
     }
@@ -182,7 +187,7 @@ class V1Metadata {
   }
 
   static Schema entrySchema(Types.StructType partitionType) {
-    return wrapFileSchema(DataFile.getType(partitionType));
+    return wrapFileSchema(dataFileSchema(partitionType));
   }
 
   static Schema wrapFileSchema(Types.StructType fileSchema) {
@@ -192,6 +197,29 @@ class V1Metadata {
         required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
   }
 
+  private static final Types.NestedField BLOCK_SIZE = required(105, "block_size_in_bytes", Types.LongType.get());
+
+  static Types.StructType dataFileSchema(Types.StructType partitionType) {
+    return Types.StructType.of(
+        DataFile.FILE_PATH,
+        DataFile.FILE_FORMAT,
+        required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType),
+        DataFile.RECORD_COUNT,
+        DataFile.FILE_SIZE,
+        BLOCK_SIZE,
+        DataFile.COLUMN_SIZES,
+        DataFile.VALUE_COUNTS,
+        DataFile.NULL_VALUE_COUNTS,
+        DataFile.LOWER_BOUNDS,
+        DataFile.UPPER_BOUNDS,
+        DataFile.KEY_METADATA,
+        DataFile.SPLIT_OFFSETS
+    );
+  }
+
+  /**
+   * Wrapper used to write a ManifestEntry to v1 metadata.
+   */
   static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
     private final org.apache.avro.Schema avroSchema;
     private final IndexedDataFile fileWrapper;
@@ -226,11 +254,10 @@ class V1Metadata {
           return wrapped.snapshotId();
         case 2:
           DataFile file = wrapped.file();
-          if (file == null || file instanceof GenericDataFile) {
-            return file;
-          } else {
+          if (file != null) {
             return fileWrapper.wrap(file);
           }
+          return null;
         default:
           throw new UnsupportedOperationException("Unknown field ordinal: " + i);
       }
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index 6ee9d3f..acd18fc 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -20,7 +20,9 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.types.Types;
@@ -31,33 +33,22 @@ class V2Metadata {
   private V2Metadata() {
   }
 
-  // fields for v2 write schema for required metadata
-  static final Types.NestedField REQUIRED_SNAPSHOT_ID =
-      required(503, "added_snapshot_id", Types.LongType.get());
-  static final Types.NestedField REQUIRED_ADDED_FILES_COUNT =
-      required(504, "added_data_files_count", Types.IntegerType.get());
-  static final Types.NestedField REQUIRED_EXISTING_FILES_COUNT =
-      required(505, "existing_data_files_count", Types.IntegerType.get());
-  static final Types.NestedField REQUIRED_DELETED_FILES_COUNT =
-      required(506, "deleted_data_files_count", Types.IntegerType.get());
-  static final Types.NestedField REQUIRED_ADDED_ROWS_COUNT =
-      required(512, "added_rows_count", Types.LongType.get());
-  static final Types.NestedField REQUIRED_EXISTING_ROWS_COUNT =
-      required(513, "existing_rows_count", Types.LongType.get());
-  static final Types.NestedField REQUIRED_DELETED_ROWS_COUNT =
-      required(514, "deleted_rows_count", Types.LongType.get());
-  static final Types.NestedField REQUIRED_SEQUENCE_NUMBER =
-      required(515, "sequence_number", Types.LongType.get());
-  static final Types.NestedField REQUIRED_MIN_SEQUENCE_NUMBER =
-      required(516, "min_sequence_number", Types.LongType.get());
-
   static final Schema MANIFEST_LIST_SCHEMA = new Schema(
-      ManifestFile.PATH, ManifestFile.LENGTH, ManifestFile.SPEC_ID,
-      REQUIRED_SEQUENCE_NUMBER, REQUIRED_MIN_SEQUENCE_NUMBER, REQUIRED_SNAPSHOT_ID,
-      REQUIRED_ADDED_FILES_COUNT, REQUIRED_EXISTING_FILES_COUNT, REQUIRED_DELETED_FILES_COUNT,
-      REQUIRED_ADDED_ROWS_COUNT, REQUIRED_EXISTING_ROWS_COUNT, REQUIRED_DELETED_ROWS_COUNT,
-      ManifestFile.PARTITION_SUMMARIES);
-
+      ManifestFile.PATH,
+      ManifestFile.LENGTH,
+      ManifestFile.SPEC_ID,
+      ManifestFile.MANIFEST_CONTENT.asRequired(),
+      ManifestFile.SEQUENCE_NUMBER.asRequired(),
+      ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
+      ManifestFile.SNAPSHOT_ID.asRequired(),
+      ManifestFile.ADDED_FILES_COUNT.asRequired(),
+      ManifestFile.EXISTING_FILES_COUNT.asRequired(),
+      ManifestFile.DELETED_FILES_COUNT.asRequired(),
+      ManifestFile.ADDED_ROWS_COUNT.asRequired(),
+      ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
+      ManifestFile.DELETED_ROWS_COUNT.asRequired(),
+      ManifestFile.PARTITION_SUMMARIES
+  );
 
   /**
    * A wrapper class to write any ManifestFile implementation to Avro using the v2 write schema.
@@ -103,6 +94,8 @@ class V2Metadata {
         case 2:
           return wrapped.partitionSpecId();
         case 3:
+          return wrapped.content().id();
+        case 4:
           if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
             // if the sequence number is being assigned here, then the manifest must be created by the current
             // operation. to validate this, check that the snapshot id matches the current commit
@@ -112,7 +105,7 @@ class V2Metadata {
           } else {
             return wrapped.sequenceNumber();
           }
-        case 4:
+        case 5:
           if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
             // same sanity check as above
             Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
@@ -123,21 +116,21 @@ class V2Metadata {
           } else {
             return wrapped.minSequenceNumber();
           }
-        case 5:
-          return wrapped.snapshotId();
         case 6:
-          return wrapped.addedFilesCount();
+          return wrapped.snapshotId();
         case 7:
-          return wrapped.existingFilesCount();
+          return wrapped.addedFilesCount();
         case 8:
-          return wrapped.deletedFilesCount();
+          return wrapped.existingFilesCount();
         case 9:
-          return wrapped.addedRowsCount();
+          return wrapped.deletedFilesCount();
         case 10:
-          return wrapped.existingRowsCount();
+          return wrapped.addedRowsCount();
         case 11:
-          return wrapped.deletedRowsCount();
+          return wrapped.existingRowsCount();
         case 12:
+          return wrapped.deletedRowsCount();
+        case 13:
           return wrapped.partitions();
         default:
           throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -160,6 +153,11 @@ class V2Metadata {
     }
 
     @Override
+    public ManifestContent content() {
+      return wrapped.content();
+    }
+
+    @Override
     public long sequenceNumber() {
       return wrapped.sequenceNumber();
     }
@@ -231,7 +229,7 @@ class V2Metadata {
   }
 
   static Schema entrySchema(Types.StructType partitionType) {
-    return wrapFileSchema(DataFile.getType(partitionType));
+    return wrapFileSchema(fileType(partitionType));
   }
 
   static Schema wrapFileSchema(Types.StructType fileSchema) {
@@ -241,17 +239,34 @@ class V2Metadata {
         required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
   }
 
+  static Types.StructType fileType(Types.StructType partitionType) {
+    return Types.StructType.of(
+        DataFile.CONTENT.asRequired(),
+        DataFile.FILE_PATH,
+        DataFile.FILE_FORMAT,
+        required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
+        DataFile.RECORD_COUNT,
+        DataFile.FILE_SIZE,
+        DataFile.COLUMN_SIZES,
+        DataFile.VALUE_COUNTS,
+        DataFile.NULL_VALUE_COUNTS,
+        DataFile.LOWER_BOUNDS,
+        DataFile.UPPER_BOUNDS,
+        DataFile.KEY_METADATA,
+        DataFile.SPLIT_OFFSETS
+    );
+  }
+
   static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
     private final org.apache.avro.Schema avroSchema;
     private final Long commitSnapshotId;
-    private final V1Metadata.IndexedDataFile fileWrapper;
+    private final IndexedDataFile fileWrapper;
     private ManifestEntry wrapped = null;
 
     IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
       this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
       this.commitSnapshotId = commitSnapshotId;
-      // TODO: when v2 data files differ from v1, this should use a v2 wrapper
-      this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema());
+      this.fileWrapper = new IndexedDataFile(partitionType);
     }
 
     public IndexedManifestEntry wrap(ManifestEntry entry) {
@@ -334,4 +349,136 @@ class V2Metadata {
       return wrapped.copyWithoutStats();
     }
   }
+
+  /**
+   * Wrapper used to write a DataFile to v2 metadata.
+   */
+  static class IndexedDataFile implements DataFile, IndexedRecord {
+    private final org.apache.avro.Schema avroSchema;
+    private final IndexedStructLike partitionWrapper;
+    private DataFile wrapped = null;
+
+    IndexedDataFile(Types.StructType partitionType) {
+      this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
+      this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
+    }
+
+    IndexedDataFile wrap(DataFile file) {
+      this.wrapped = file;
+      return this;
+    }
+
+    @Override
+    public org.apache.avro.Schema getSchema() {
+      return avroSchema;
+    }
+
+    @Override
+    public Object get(int pos) {
+      switch (pos) {
+        case 0:
+          return FileContent.DATA.id();
+        case 1:
+          return wrapped.path().toString();
+        case 2:
+          return wrapped.format() != null ? wrapped.format().toString() : null;
+        case 3:
+          return partitionWrapper.wrap(wrapped.partition());
+        case 4:
+          return wrapped.recordCount();
+        case 5:
+          return wrapped.fileSizeInBytes();
+        case 6:
+          return wrapped.columnSizes();
+        case 7:
+          return wrapped.valueCounts();
+        case 8:
+          return wrapped.nullValueCounts();
+        case 9:
+          return wrapped.lowerBounds();
+        case 10:
+          return wrapped.upperBounds();
+        case 11:
+          return wrapped.keyMetadata();
+        case 12:
+          return wrapped.splitOffsets();
+      }
+      throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+    }
+
+    @Override
+    public void put(int i, Object v) {
+      throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
+    }
+
+    @Override
+    public CharSequence path() {
+      return wrapped.path();
+    }
+
+    @Override
+    public FileFormat format() {
+      return wrapped.format();
+    }
+
+    @Override
+    public StructLike partition() {
+      return wrapped.partition();
+    }
+
+    @Override
+    public long recordCount() {
+      return wrapped.recordCount();
+    }
+
+    @Override
+    public long fileSizeInBytes() {
+      return wrapped.fileSizeInBytes();
+    }
+
+    @Override
+    public Map<Integer, Long> columnSizes() {
+      return wrapped.columnSizes();
+    }
+
+    @Override
+    public Map<Integer, Long> valueCounts() {
+      return wrapped.valueCounts();
+    }
+
+    @Override
+    public Map<Integer, Long> nullValueCounts() {
+      return wrapped.nullValueCounts();
+    }
+
+    @Override
+    public Map<Integer, ByteBuffer> lowerBounds() {
+      return wrapped.lowerBounds();
+    }
+
+    @Override
+    public Map<Integer, ByteBuffer> upperBounds() {
+      return wrapped.upperBounds();
+    }
+
+    @Override
+    public ByteBuffer keyMetadata() {
+      return wrapped.keyMetadata();
+    }
+
+    @Override
+    public List<Long> splitOffsets() {
+      return wrapped.splitOffsets();
+    }
+
+    @Override
+    public DataFile copy() {
+      return wrapped.copy();
+    }
+
+    @Override
+    public DataFile copyWithoutStats() {
+      return wrapped.copyWithoutStats();
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
index 65ec4b2..7742eb4 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
@@ -55,7 +55,12 @@ public class TestManifestListVersions {
   private static final List<ManifestFile.PartitionFieldSummary> PARTITION_SUMMARIES = ImmutableList.of();
 
   private static final ManifestFile TEST_MANIFEST = new GenericManifestFile(
-      PATH, LENGTH, SPEC_ID, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
+      PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
+      ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
+      PARTITION_SUMMARIES);
+
+  private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile(
+      PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
       ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
       PARTITION_SUMMARIES);
 
@@ -63,6 +68,13 @@ public class TestManifestListVersions {
   public TemporaryFolder temp = new TemporaryFolder();
 
   @Test
+  public void testV1WriteDeleteManifest() {
+    AssertHelpers.assertThrows("Should fail to write a DELETE manifest to v1",
+        IllegalArgumentException.class, "Cannot store delete manifests in a v1 table",
+        () -> writeManifestList(TEST_DELETE_MANIFEST, 1));
+  }
+
+  @Test
   public void testV1Write() throws IOException {
     ManifestFile manifest = writeAndReadManifestList(1);
 
@@ -74,6 +86,7 @@ public class TestManifestListVersions {
     Assert.assertEquals("Path", PATH, manifest.path());
     Assert.assertEquals("Length", LENGTH, manifest.length());
     Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId());
+    Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
     Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId());
     Assert.assertEquals("Added files count", ADDED_FILES, (int) manifest.addedFilesCount());
     Assert.assertEquals("Existing files count", EXISTING_FILES, (int) manifest.existingFilesCount());
@@ -91,6 +104,7 @@ public class TestManifestListVersions {
     Assert.assertEquals("Path", PATH, manifest.path());
     Assert.assertEquals("Length", LENGTH, manifest.length());
     Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId());
+    Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
     Assert.assertEquals("Sequence number", SEQ_NUM, manifest.sequenceNumber());
     Assert.assertEquals("Min sequence number", MIN_SEQ_NUM, manifest.minSequenceNumber());
     Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId());
@@ -104,7 +118,7 @@ public class TestManifestListVersions {
 
   @Test
   public void testV1ForwardCompatibility() throws IOException {
-    InputFile manifestList = writeManifestList(1);
+    InputFile manifestList = writeManifestList(TEST_MANIFEST, 1);
     GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA);
 
     // v1 metadata should match even though order changed
@@ -118,12 +132,15 @@ public class TestManifestListVersions {
     Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
     Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
     Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
+    Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
+    Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
+    Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
   }
 
   @Test
   public void testV2ForwardCompatibility() throws IOException {
-    // v2 manifest list files can be read by v1 readers, but the sequence numbers will be ignored.
-    InputFile manifestList = writeManifestList(2);
+    // v2 manifest list files can be read by v1 readers, but the sequence numbers and content will be ignored.
+    InputFile manifestList = writeManifestList(TEST_MANIFEST, 2);
     GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA);
 
     // v1 metadata should match even though order changed
@@ -137,6 +154,9 @@ public class TestManifestListVersions {
     Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
     Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
     Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
+    Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
+    Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
+    Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
   }
 
   @Test
@@ -187,11 +207,11 @@ public class TestManifestListVersions {
     Assert.assertNull("Deleted rows count should be null", manifest.deletedRowsCount());
   }
 
-  private InputFile writeManifestList(int formatVersion) throws IOException {
+  private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException {
     OutputFile manifestList = Files.localOutput(temp.newFile());
     try (FileAppender<ManifestFile> writer = ManifestLists.write(
         formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQ_NUM : 0)) {
-      writer.add(TEST_MANIFEST);
+      writer.add(manifest);
     }
     return manifestList.toInputFile();
   }
@@ -208,7 +228,7 @@ public class TestManifestListVersions {
   }
 
   private ManifestFile writeAndReadManifestList(int formatVersion) throws IOException {
-    List<ManifestFile> manifests = ManifestLists.read(writeManifestList(formatVersion));
+    List<ManifestFile> manifests = ManifestLists.read(writeManifestList(TEST_MANIFEST, formatVersion));
     Assert.assertEquals("Should contain one manifest", 1, manifests.size());
     return manifests.get(0);
   }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index e993256..a020611 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -125,8 +126,9 @@ public abstract class TestIcebergSourceTablesBase {
     try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
       // each row must inherit snapshot_id and sequence_number
       rows.forEach(row -> {
-        row.put(1, snapshot.snapshotId());
         row.put(2, 0L);
+        GenericData.Record file = (GenericData.Record) row.get("data_file");
+        file.put(0, FileContent.DATA.id());
         expected.add(row);
       });
     }
@@ -172,9 +174,13 @@ public abstract class TestIcebergSourceTablesBase {
     for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
-        for (GenericData.Record record : rows) {
-          expected.add(record);
-        }
+        // each row must inherit snapshot_id and sequence_number
+        rows.forEach(row -> {
+          row.put(2, 0L);
+          GenericData.Record file = (GenericData.Record) row.get("data_file");
+          file.put(0, FileContent.DATA.id());
+          expected.add(row);
+        });
       }
     }
 
@@ -222,7 +228,9 @@ public abstract class TestIcebergSourceTablesBase {
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
+            GenericData.Record file = (GenericData.Record) record.get("data_file");
+            file.put(0, FileContent.DATA.id());
+            expected.add(file);
           }
         }
       }
@@ -248,14 +256,14 @@ public abstract class TestIcebergSourceTablesBase {
         new SimpleRecord(2, "b")
      );
 
-    try {
-      Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
-      inputDF.select("id", "data").write()
-          .format("parquet")
-          .mode("append")
-          .partitionBy("id")
-          .saveAsTable("parquet_table");
+    Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+    inputDF.select("id", "data").write()
+        .format("parquet")
+        .mode("overwrite")
+        .partitionBy("id")
+        .saveAsTable("parquet_table");
 
+    try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(spark,
           new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
@@ -271,7 +279,9 @@ public abstract class TestIcebergSourceTablesBase {
         InputFile in = table.io().newInputFile(manifest.path());
         try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
           for (GenericData.Record record : rows) {
-            expected.add((GenericData.Record) record.get("data_file"));
+            GenericData.Record file = (GenericData.Record) record.get("data_file");
+            file.put(0, FileContent.DATA.id());
+            expected.add(file);
           }
         }
       }
@@ -301,14 +311,14 @@ public abstract class TestIcebergSourceTablesBase {
         new SimpleRecord(2, "b")
     );
 
-    try {
-      Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
-      inputDF.select("id", "data").write()
-          .format("parquet")
-          .mode("append")
-          .partitionBy("id")
-          .saveAsTable("parquet_table");
+    Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+    inputDF.select("id", "data").write()
+        .format("parquet")
+        .mode("overwrite")
+        .partitionBy("id")
+        .saveAsTable("parquet_table");
 
+    try {
       String stagingLocation = table.location() + "/metadata";
       SparkTableUtil.importSparkTable(
           spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), table, stagingLocation);
@@ -371,7 +381,9 @@ public abstract class TestIcebergSourceTablesBase {
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
+            GenericData.Record file = (GenericData.Record) record.get("data_file");
+            file.put(0, FileContent.DATA.id());
+            expected.add(file);
           }
         }
       }
@@ -458,7 +470,7 @@ public abstract class TestIcebergSourceTablesBase {
         .load(loadLocation(tableIdentifier, "all_data_files"))
         .orderBy("file_path")
         .collectAsList();
-    actual.sort(Comparator.comparing(o -> o.getString(0)));
+    actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
@@ -466,7 +478,9 @@ public abstract class TestIcebergSourceTablesBase {
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
           if ((Integer) record.get("status") < 2 /* added or existing */) {
-            expected.add((GenericData.Record) record.get("data_file"));
+            GenericData.Record file = (GenericData.Record) record.get("data_file");
+            file.put(0, FileContent.DATA.id());
+            expected.add(file);
           }
         }
       }