You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2020/06/09 17:43:41 UTC

[iceberg] branch master updated: Combine ManifestReaders into one parameterized class. (#1099)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ac53b3  Combine ManifestReaders into one parameterized class. (#1099)
2ac53b3 is described below

commit 2ac53b346c8a1b4cfac42aa8b5811b120b4cf0f0
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Jun 9 10:43:32 2020 -0700

    Combine ManifestReaders into one parameterized class. (#1099)
---
 .../org/apache/iceberg/BaseManifestReader.java     | 294 ---------------------
 .../org/apache/iceberg/BaseMetastoreCatalog.java   |   2 +-
 .../org/apache/iceberg/BaseRewriteManifests.java   |   2 +-
 .../org/apache/iceberg/DeleteManifestReader.java   |  40 ---
 .../java/org/apache/iceberg/ManifestFiles.java     |  43 ++-
 .../java/org/apache/iceberg/ManifestGroup.java     |   4 +-
 .../java/org/apache/iceberg/ManifestReader.java    | 269 ++++++++++++++++++-
 .../apache/iceberg/MergingSnapshotProducer.java    |   6 +-
 .../java/org/apache/iceberg/RemoveSnapshots.java   |  17 +-
 .../java/org/apache/iceberg/SnapshotProducer.java  |   2 +-
 .../org/apache/iceberg/TestManifestReader.java     |   6 +-
 .../org/apache/iceberg/TestRewriteManifests.java   |  14 +-
 .../java/org/apache/iceberg/TestTransaction.java   |   2 +-
 .../apache/iceberg/spark/TestSparkDataFile.java    |   2 +-
 14 files changed, 313 insertions(+), 390 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
deleted file mode 100644
index 7217fc0..0000000
--- a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Evaluator;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
-
-import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
-
-/**
- * Base reader for data and delete manifest files.
- *
- * @param <F> The Java class of files returned by this reader.
- * @param <ThisT> The Java class of this reader, returned by configuration methods.
- */
-abstract class BaseManifestReader<F extends ContentFile<F>, ThisT>
-    extends CloseableGroup implements CloseableIterable<F> {
-  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
-  private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
-      "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
-
-  protected enum FileType {
-    DATA_FILES(GenericDataFile.class.getName()),
-    DELETE_FILES(GenericDeleteFile.class.getName());
-
-    private final String fileClass;
-
-    FileType(String fileClass) {
-      this.fileClass = fileClass;
-    }
-
-    private String fileClass() {
-      return fileClass;
-    }
-  }
-
-  private final InputFile file;
-  private final InheritableMetadata inheritableMetadata;
-  private final FileType content;
-  private final Map<String, String> metadata;
-  private final PartitionSpec spec;
-  private final Schema fileSchema;
-
-  // updated by configuration methods
-  private Expression partFilter = alwaysTrue();
-  private Expression rowFilter = alwaysTrue();
-  private Schema fileProjection = null;
-  private Collection<String> columns = null;
-  private boolean caseSensitive = true;
-
-  // lazily initialized
-  private Evaluator lazyEvaluator = null;
-  private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
-
-  protected BaseManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
-                               InheritableMetadata inheritableMetadata, FileType content) {
-    this.file = file;
-    this.inheritableMetadata = inheritableMetadata;
-    this.content = content;
-
-    try {
-      try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
-          .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
-          .build()) {
-        this.metadata = headerReader.getMetadata();
-      }
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
-
-    int specId = TableMetadata.INITIAL_SPEC_ID;
-    String specProperty = metadata.get("partition-spec-id");
-    if (specProperty != null) {
-      specId = Integer.parseInt(specProperty);
-    }
-
-    if (specsById != null) {
-      this.spec = specsById.get(specId);
-    } else {
-      Schema schema = SchemaParser.fromJson(metadata.get("schema"));
-      this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
-    }
-
-    this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
-  }
-
-  protected abstract ThisT self();
-
-  public InputFile file() {
-    return file;
-  }
-
-  public Schema schema() {
-    return fileSchema;
-  }
-
-  public PartitionSpec spec() {
-    return spec;
-  }
-
-  public ThisT select(Collection<String> newColumns) {
-    Preconditions.checkState(fileProjection == null,
-        "Cannot select columns using both select(String...) and project(Schema)");
-    this.columns = newColumns;
-    return self();
-  }
-
-  public ThisT project(Schema newFileProjection) {
-    Preconditions.checkState(columns == null,
-        "Cannot select columns using both select(String...) and project(Schema)");
-    this.fileProjection = newFileProjection;
-    return self();
-  }
-
-  public ThisT filterPartitions(Expression expr) {
-    this.partFilter = Expressions.and(partFilter, expr);
-    return self();
-  }
-
-  public ThisT filterRows(Expression expr) {
-    this.rowFilter = Expressions.and(rowFilter, expr);
-    return self();
-  }
-
-  public ThisT caseSensitive(boolean isCaseSensitive) {
-    this.caseSensitive = isCaseSensitive;
-    return self();
-  }
-
-  CloseableIterable<ManifestEntry<F>> entries() {
-    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
-        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
-      Evaluator evaluator = evaluator();
-      InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
-
-      // ensure stats columns are present for metrics evaluation
-      boolean requireStatsProjection = requireStatsProjection(rowFilter, columns);
-      Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;
-
-      return CloseableIterable.filter(
-          open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
-          entry -> entry != null &&
-              evaluator.eval(entry.file().partition()) &&
-              metricsEvaluator.eval(entry.file()));
-    } else {
-      return open(projection(fileSchema, fileProjection, columns, caseSensitive));
-    }
-  }
-
-  private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
-    FileFormat format = FileFormat.fromFileName(file.location());
-    Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
-
-    switch (format) {
-      case AVRO:
-        AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
-            .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
-            .rename("manifest_entry", GenericManifestEntry.class.getName())
-            .rename("partition", PartitionData.class.getName())
-            .rename("r102", PartitionData.class.getName())
-            .rename("data_file", content.fileClass())
-            .rename("r2", content.fileClass())
-            .classLoader(GenericManifestEntry.class.getClassLoader())
-            .reuseContainers()
-            .build();
-
-        addCloseable(reader);
-
-        return CloseableIterable.transform(reader, inheritableMetadata::apply);
-
-      default:
-        throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
-    }
-  }
-
-  CloseableIterable<ManifestEntry<F>> liveEntries() {
-    return CloseableIterable.filter(entries(),
-        entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
-  }
-
-  /**
-   * @return an Iterator of DataFile. Makes defensive copies of files before returning
-   */
-  @Override
-  public CloseableIterator<F> iterator() {
-    if (dropStats(rowFilter, columns)) {
-      return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
-    } else {
-      return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
-    }
-  }
-
-  private static Schema projection(Schema schema, Schema project, Collection<String> columns, boolean caseSensitive) {
-    if (columns != null) {
-      if (caseSensitive) {
-        return schema.select(columns);
-      } else {
-        return schema.caseInsensitiveSelect(columns);
-      }
-    } else if (project != null) {
-      return project;
-    }
-
-    return schema;
-  }
-
-  private Evaluator evaluator() {
-    if (lazyEvaluator == null) {
-      Expression projected = Projections.inclusive(spec, caseSensitive).project(rowFilter);
-      Expression finalPartFilter = Expressions.and(projected, partFilter);
-      if (finalPartFilter != null) {
-        this.lazyEvaluator = new Evaluator(spec.partitionType(), finalPartFilter, caseSensitive);
-      } else {
-        this.lazyEvaluator = new Evaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
-      }
-    }
-    return lazyEvaluator;
-  }
-
-  private InclusiveMetricsEvaluator metricsEvaluator() {
-    if (lazyMetricsEvaluator == null) {
-      if (rowFilter != null) {
-        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
-            spec.schema(), rowFilter, caseSensitive);
-      } else {
-        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
-            spec.schema(), Expressions.alwaysTrue(), caseSensitive);
-      }
-    }
-    return lazyMetricsEvaluator;
-  }
-
-  private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) {
-    // Make sure we have all stats columns for metrics evaluator
-    return rowFilter != Expressions.alwaysTrue() &&
-        !columns.containsAll(BaseManifestReader.ALL_COLUMNS) &&
-        !columns.containsAll(STATS_COLUMNS);
-  }
-
-  static boolean dropStats(Expression rowFilter, Collection<String> columns) {
-    // Make sure we only drop all stats if we had projected all stats
-    // We do not drop stats even if we had partially added some stats columns
-    return rowFilter != Expressions.alwaysTrue() &&
-        !columns.containsAll(BaseManifestReader.ALL_COLUMNS) &&
-        Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
-  }
-
-  private static Collection<String> withStatsColumns(Collection<String> columns) {
-    if (columns.containsAll(BaseManifestReader.ALL_COLUMNS)) {
-      return columns;
-    } else {
-      List<String> projectColumns = Lists.newArrayList(columns);
-      projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
-      return projectColumns;
-    }
-  }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index cabf648..4a077f3 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -262,7 +262,7 @@ public abstract class BaseMetastoreCatalog implements Catalog {
         .executeWith(ThreadPools.getWorkerPool())
         .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
         .run(manifest -> {
-          try (ManifestReader reader = ManifestFiles.read(manifest, io)) {
+          try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
             for (ManifestEntry<?> entry : reader.entries()) {
               // intern the file path because the weak key map uses identity (==) instead of equals
               String path = entry.file().path().toString().intern();
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index d49b60a..de0a785 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -235,7 +235,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
               keptManifests.add(manifest);
             } else {
               rewrittenManifests.add(manifest);
-              try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
+              try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
                   .select(Arrays.asList("*"))) {
                 reader.liveEntries().forEach(
                     entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
diff --git a/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
deleted file mode 100644
index dfeee6b..0000000
--- a/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.util.Map;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.InputFile;
-
-/**
- * Reader for manifest files.
- * <p>
- * Create readers using {@link ManifestFiles#readDeleteManifest(ManifestFile, FileIO, Map)}.
- */
-public class DeleteManifestReader extends BaseManifestReader<DeleteFile, DeleteManifestReader> {
-  protected DeleteManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById, InheritableMetadata metadata) {
-    super(file, specsById, metadata, FileType.DELETE_FILES);
-  }
-
-  @Override
-  protected DeleteManifestReader self() {
-    return this;
-  }
-}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 730badf..809d644 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import java.io.IOException;
 import java.util.Map;
+import org.apache.iceberg.ManifestReader.FileType;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
@@ -42,7 +43,7 @@ public class ManifestFiles {
    * @param io a FileIO
    * @return a manifest reader
    */
-  public static ManifestReader read(ManifestFile manifest, FileIO io) {
+  public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io) {
     return read(manifest, io, null);
   }
 
@@ -54,12 +55,12 @@ public class ManifestFiles {
    * @param specsById a Map from spec ID to partition spec
    * @return a {@link ManifestReader}
    */
-  public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
+  public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
     Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
         "Cannot read a delete manifest with a ManifestReader: %s", manifest);
     InputFile file = io.newInputFile(manifest.path());
     InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
-    return new ManifestReader(file, specsById, inheritableMetadata);
+    return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES);
   }
 
   /**
@@ -97,20 +98,20 @@ public class ManifestFiles {
   }
 
   /**
-   * Returns a new {@link DeleteManifestReader} for a {@link ManifestFile}.
+   * Returns a new {@link ManifestReader} for a {@link ManifestFile}.
    *
    * @param manifest a {@link ManifestFile}
    * @param io a {@link FileIO}
    * @param specsById a Map from spec ID to partition spec
-   * @return a {@link DeleteManifestReader}
+   * @return a {@link ManifestReader}
    */
-  public static DeleteManifestReader readDeleteManifest(ManifestFile manifest, FileIO io,
-                                                        Map<Integer, PartitionSpec> specsById) {
+  public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifest, FileIO io,
+                                                              Map<Integer, PartitionSpec> specsById) {
     Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
         "Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
     InputFile file = io.newInputFile(manifest.path());
     InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
-    return new DeleteManifestReader(file, specsById, inheritableMetadata);
+    return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
   }
 
   /**
@@ -133,13 +134,29 @@ public class ManifestFiles {
     throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
   }
 
+  static ManifestReader<?> open(ManifestFile manifest, FileIO io) {
+    return open(manifest, io, null);
+  }
+
+  static ManifestReader<?> open(ManifestFile manifest, FileIO io,
+                                Map<Integer, PartitionSpec> specsById) {
+    switch (manifest.content()) {
+      case DATA:
+        return ManifestFiles.read(manifest, io, specsById);
+      case DELETES:
+        return ManifestFiles.readDeleteManifest(manifest, io, specsById);
+    }
+    throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content());
+  }
+
   static ManifestFile copyAppendManifest(int formatVersion,
                                          InputFile toCopy, Map<Integer, PartitionSpec> specsById,
                                          OutputFile outputFile, long snapshotId,
                                          SnapshotSummary.Builder summaryBuilder) {
     // use metadata that will add the current snapshot's ID for the rewrite
     InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
-    try (ManifestReader reader = new ManifestReader(toCopy, specsById, inheritableMetadata)) {
+    try (ManifestReader<DataFile> reader =
+             new ManifestReader<>(toCopy, specsById, inheritableMetadata, FileType.DATA_FILES)) {
       return copyManifestInternal(
           formatVersion, reader, outputFile, snapshotId, summaryBuilder, ManifestEntry.Status.ADDED);
     } catch (IOException e) {
@@ -153,7 +170,8 @@ public class ManifestFiles {
                                           SnapshotSummary.Builder summaryBuilder) {
     // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an exception if it is not
     InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
-    try (ManifestReader reader = new ManifestReader(toCopy, specsById, inheritableMetadata)) {
+    try (ManifestReader<DataFile> reader =
+             new ManifestReader<>(toCopy, specsById, inheritableMetadata, FileType.DATA_FILES)) {
       return copyManifestInternal(
           formatVersion, reader, outputFile, snapshotId, summaryBuilder, ManifestEntry.Status.EXISTING);
     } catch (IOException e) {
@@ -161,8 +179,9 @@ public class ManifestFiles {
     }
   }
 
-  private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, OutputFile outputFile,
-                                                   long snapshotId, SnapshotSummary.Builder summaryBuilder,
+  private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader<DataFile> reader,
+                                                   OutputFile outputFile, long snapshotId,
+                                                   SnapshotSummary.Builder summaryBuilder,
                                                    ManifestEntry.Status allowedEntryStatus) {
     ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
     boolean threw = true;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 6308236..bc7114b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -68,7 +68,7 @@ class ManifestGroup {
     this.ignoreDeleted = false;
     this.ignoreExisting = false;
     this.ignoreResiduals = false;
-    this.columns = BaseManifestReader.ALL_COLUMNS;
+    this.columns = ManifestReader.ALL_COLUMNS;
     this.caseSensitive = true;
     this.manifestPredicate = m -> true;
     this.manifestEntryPredicate = e -> true;
@@ -146,7 +146,7 @@ class ManifestGroup {
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
       return ResidualEvaluator.of(spec, filter, caseSensitive);
     });
-    boolean dropStats = BaseManifestReader.dropStats(dataFilter, columns);
+    boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
     Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
       int partitionSpecId = manifest.partitionSpecId();
       PartitionSpec spec = specsById.get(partitionSpecId);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 45c19fc..bd3801c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -19,22 +19,273 @@
 
 package org.apache.iceberg;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
-import org.apache.iceberg.io.FileIO;
+import java.util.Set;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
 
 /**
- * Reader for manifest files.
- * <p>
- * Create readers using {@link ManifestFiles#read(ManifestFile, FileIO, Map)}.
+ * Base reader for data and delete manifest files.
+ *
+ * @param <F> The Java class of files returned by this reader.
  */
-public class ManifestReader extends BaseManifestReader<DataFile, ManifestReader> {
-  ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
-                        InheritableMetadata inheritableMetadata) {
-    super(file, specsById, inheritableMetadata, FileType.DATA_FILES);
+public class ManifestReader<F extends ContentFile<F>>
+    extends CloseableGroup implements CloseableIterable<F> {
+  static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+  private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
+      "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
+
+  protected enum FileType {
+    DATA_FILES(GenericDataFile.class.getName()),
+    DELETE_FILES(GenericDeleteFile.class.getName());
+
+    private final String fileClass;
+
+    FileType(String fileClass) {
+      this.fileClass = fileClass;
+    }
+
+    private String fileClass() {
+      return fileClass;
+    }
+  }
+
+  private final InputFile file;
+  private final InheritableMetadata inheritableMetadata;
+  private final FileType content;
+  private final Map<String, String> metadata;
+  private final PartitionSpec spec;
+  private final Schema fileSchema;
+
+  // updated by configuration methods
+  private Expression partFilter = alwaysTrue();
+  private Expression rowFilter = alwaysTrue();
+  private Schema fileProjection = null;
+  private Collection<String> columns = null;
+  private boolean caseSensitive = true;
+
+  // lazily initialized
+  private Evaluator lazyEvaluator = null;
+  private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
+
+  protected ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
+                           InheritableMetadata inheritableMetadata, FileType content) {
+    this.file = file;
+    this.inheritableMetadata = inheritableMetadata;
+    this.content = content;
+
+    try {
+      try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
+          .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
+          .build()) {
+        this.metadata = headerReader.getMetadata();
+      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+
+    int specId = TableMetadata.INITIAL_SPEC_ID;
+    String specProperty = metadata.get("partition-spec-id");
+    if (specProperty != null) {
+      specId = Integer.parseInt(specProperty);
+    }
+
+    if (specsById != null) {
+      this.spec = specsById.get(specId);
+    } else {
+      Schema schema = SchemaParser.fromJson(metadata.get("schema"));
+      this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+    }
+
+    this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
+  }
+
+  public InputFile file() {
+    return file;
+  }
+
+  public Schema schema() {
+    return fileSchema;
+  }
+
+  public PartitionSpec spec() {
+    return spec;
+  }
+
+  public ManifestReader<F> select(Collection<String> newColumns) {
+    Preconditions.checkState(fileProjection == null,
+        "Cannot select columns using both select(String...) and project(Schema)");
+    this.columns = newColumns;
+    return this;
+  }
+
+  public ManifestReader<F> project(Schema newFileProjection) {
+    Preconditions.checkState(columns == null,
+        "Cannot select columns using both select(String...) and project(Schema)");
+    this.fileProjection = newFileProjection;
+    return this;
+  }
+
+  public ManifestReader<F> filterPartitions(Expression expr) {
+    this.partFilter = Expressions.and(partFilter, expr);
+    return this;
   }
 
-  protected ManifestReader self() {
+  public ManifestReader<F> filterRows(Expression expr) {
+    this.rowFilter = Expressions.and(rowFilter, expr);
     return this;
   }
+
+  public ManifestReader<F> caseSensitive(boolean isCaseSensitive) {
+    this.caseSensitive = isCaseSensitive;
+    return this;
+  }
+
+  CloseableIterable<ManifestEntry<F>> entries() {
+    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
+        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
+      Evaluator evaluator = evaluator();
+      InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
+
+      // ensure stats columns are present for metrics evaluation
+      boolean requireStatsProjection = requireStatsProjection(rowFilter, columns);
+      Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;
+
+      return CloseableIterable.filter(
+          open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
+          entry -> entry != null &&
+              evaluator.eval(entry.file().partition()) &&
+              metricsEvaluator.eval(entry.file()));
+    } else {
+      return open(projection(fileSchema, fileProjection, columns, caseSensitive));
+    }
+  }
+
+  private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
+    FileFormat format = FileFormat.fromFileName(file.location());
+    Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
+
+    switch (format) {
+      case AVRO:
+        AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
+            .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
+            .rename("manifest_entry", GenericManifestEntry.class.getName())
+            .rename("partition", PartitionData.class.getName())
+            .rename("r102", PartitionData.class.getName())
+            .rename("data_file", content.fileClass())
+            .rename("r2", content.fileClass())
+            .classLoader(GenericManifestEntry.class.getClassLoader())
+            .reuseContainers()
+            .build();
+
+        addCloseable(reader);
+
+        return CloseableIterable.transform(reader, inheritableMetadata::apply);
+
+      default:
+        throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
+    }
+  }
+
+  CloseableIterable<ManifestEntry<F>> liveEntries() {
+    return CloseableIterable.filter(entries(),
+        entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
+  }
+
+  /**
+   * @return an Iterator of DataFile. Makes defensive copies of files before returning
+   */
+  @Override
+  public CloseableIterator<F> iterator() {
+    if (dropStats(rowFilter, columns)) {
+      return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
+    } else {
+      return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
+    }
+  }
+
+  private static Schema projection(Schema schema, Schema project, Collection<String> columns, boolean caseSensitive) {
+    if (columns != null) {
+      if (caseSensitive) {
+        return schema.select(columns);
+      } else {
+        return schema.caseInsensitiveSelect(columns);
+      }
+    } else if (project != null) {
+      return project;
+    }
+
+    return schema;
+  }
+
+  private Evaluator evaluator() {
+    if (lazyEvaluator == null) {
+      Expression projected = Projections.inclusive(spec, caseSensitive).project(rowFilter);
+      Expression finalPartFilter = Expressions.and(projected, partFilter);
+      if (finalPartFilter != null) {
+        this.lazyEvaluator = new Evaluator(spec.partitionType(), finalPartFilter, caseSensitive);
+      } else {
+        this.lazyEvaluator = new Evaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
+      }
+    }
+    return lazyEvaluator;
+  }
+
+  private InclusiveMetricsEvaluator metricsEvaluator() {
+    if (lazyMetricsEvaluator == null) {
+      if (rowFilter != null) {
+        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+            spec.schema(), rowFilter, caseSensitive);
+      } else {
+        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+            spec.schema(), Expressions.alwaysTrue(), caseSensitive);
+      }
+    }
+    return lazyMetricsEvaluator;
+  }
+
+  private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) {
+    // Make sure we have all stats columns for metrics evaluator
+    return rowFilter != Expressions.alwaysTrue() &&
+        !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
+        !columns.containsAll(STATS_COLUMNS);
+  }
+
+  static boolean dropStats(Expression rowFilter, Collection<String> columns) {
+    // Make sure we only drop all stats if we had projected all stats
+    // We do not drop stats even if we had partially added some stats columns
+    return rowFilter != Expressions.alwaysTrue() &&
+        !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
+        Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
+  }
+
+  private static Collection<String> withStatsColumns(Collection<String> columns) {
+    if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
+      return columns;
+    } else {
+      List<String> projectColumns = Lists.newArrayList(columns);
+      projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
+      return projectColumns;
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index dad5018..3076df3 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -524,7 +524,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   }
 
   private boolean manifestHasDeletedFiles(
-      StrictMetricsEvaluator metricsEvaluator, ManifestReader reader,
+      StrictMetricsEvaluator metricsEvaluator, ManifestReader<DataFile> reader,
       CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
     Evaluator inclusive = extractInclusiveDeleteExpression(reader);
     Evaluator strict = extractStrictDeleteExpression(reader);
@@ -550,7 +550,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
   }
 
   private ManifestFile filterManifestWithDeletedFiles(
-      StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader,
+      StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<DataFile> reader,
       CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) throws IOException {
     Evaluator inclusive = extractInclusiveDeleteExpression(reader);
     Evaluator strict = extractStrictDeleteExpression(reader);
@@ -673,7 +673,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
     ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec(specId));
     try {
       for (ManifestFile manifest : bin) {
-        try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+        try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
           for (ManifestEntry<DataFile> entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 5f33bfd..83424d9 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -22,7 +22,6 @@ package org.apache.iceberg;
 import java.io.IOException;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
@@ -31,7 +30,6 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -351,7 +349,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
         .run(manifest -> {
           // the manifest has deletes, scan it to find files to delete
-          try (BaseManifestReader<?, ?> reader = openManifest(manifest, ops.io(), ops.current().specsById())) {
+          try (ManifestReader<?> reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) {
             for (ManifestEntry<?> entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -371,7 +369,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         .onFailure((item, exc) -> LOG.warn("Failed to get added files: this may cause orphaned data files", exc))
         .run(manifest -> {
           // the manifest has deletes, scan it to find files to delete
-          try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+          try (ManifestReader<?> reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) {
             for (ManifestEntry<?> entry : reader.entries()) {
               // delete any ADDED file from manifests that were reverted
               if (entry.status() == ManifestEntry.Status.ADDED) {
@@ -403,15 +401,4 @@ class RemoveSnapshots implements ExpireSnapshots {
       return CloseableIterable.withNoopClose(snapshot.allManifests());
     }
   }
-
-  private static BaseManifestReader<?, ?> openManifest(ManifestFile manifest, FileIO io,
-                                                       Map<Integer, PartitionSpec> specsById) {
-    switch (manifest.content()) {
-      case DATA:
-        return ManifestFiles.read(manifest, io, specsById);
-      case DELETES:
-        return ManifestFiles.readDeleteManifest(manifest, io, specsById);
-    }
-    throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content());
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0daf5f7..77565e7 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -347,7 +347,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
   }
 
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
-    try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
       PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
       int addedFiles = 0;
       long addedRows = 0L;
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index e899fb7..4e91131 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -46,7 +46,7 @@ public class TestManifestReader extends TableTestBase {
   @Test
   public void testManifestReaderWithEmptyInheritableMetadata() throws IOException {
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A));
-    try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
       ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(Status.EXISTING, entry.status());
       Assert.assertEquals(FILE_A.path(), entry.file().path());
@@ -66,7 +66,7 @@ public class TestManifestReader extends TableTestBase {
   @Test
   public void testManifestReaderWithPartitionMetadata() throws IOException {
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
-    try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
       ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(123L, (long) entry.snapshotId());
 
@@ -87,7 +87,7 @@ public class TestManifestReader extends TableTestBase {
     table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec));
 
     ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
-    try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
       ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
       Assert.assertEquals(123L, (long) entry.snapshotId());
 
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 74a46de..7a2c5fb 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -116,7 +116,7 @@ public class TestRewriteManifests extends TableTestBase {
     // get the correct file order
     List<DataFile> files;
     List<Long> ids;
-    try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
       if (reader.iterator().next().path().equals(FILE_A.path())) {
         files = Arrays.asList(FILE_A, FILE_B);
         ids = Arrays.asList(manifestAppendId, fileAppendId);
@@ -190,7 +190,7 @@ public class TestRewriteManifests extends TableTestBase {
     // get the file order correct
     List<DataFile> files;
     List<Long> ids;
-    try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
       if (reader.iterator().next().path().equals(FILE_A.path())) {
         files = Arrays.asList(FILE_A, FILE_B);
         ids = Arrays.asList(appendIdA, appendIdB);
@@ -232,7 +232,7 @@ public class TestRewriteManifests extends TableTestBase {
     table.rewriteManifests()
       .clusterBy(file -> "file")
       .rewriteIf(manifest -> {
-        try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+        try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
           return !reader.iterator().next().path().equals(FILE_A.path());
         } catch (IOException x) {
           throw new RuntimeIOException(x);
@@ -246,7 +246,7 @@ public class TestRewriteManifests extends TableTestBase {
     // get the file order correct
     List<DataFile> files;
     List<Long> ids;
-    try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
       if (reader.iterator().next().path().equals(FILE_B.path())) {
         files = Arrays.asList(FILE_B, FILE_C);
         ids = Arrays.asList(appendIdB, appendIdC);
@@ -316,7 +316,7 @@ public class TestRewriteManifests extends TableTestBase {
     table.rewriteManifests()
       .clusterBy(file -> "file")
       .rewriteIf(manifest -> {
-        try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+        try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
           return !reader.iterator().next().path().equals(FILE_A.path());
         } catch (IOException x) {
           throw new RuntimeIOException(x);
@@ -336,7 +336,7 @@ public class TestRewriteManifests extends TableTestBase {
     // get the file order correct
     List<DataFile> files;
     List<Long> ids;
-    try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
       if (reader.iterator().next().path().equals(FILE_A.path())) {
         files = Arrays.asList(FILE_A, FILE_B);
         ids = Arrays.asList(appendIdA, appendIdB);
@@ -899,7 +899,7 @@ public class TestRewriteManifests extends TableTestBase {
         .addManifest(newManifest)
         .clusterBy(dataFile -> "const-value")
         .rewriteIf(manifest -> {
-          try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+          try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
             return !reader.iterator().next().path().equals(FILE_B.path());
           } catch (IOException x) {
             throw new RuntimeIOException(x);
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 2182608..577480f 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -502,7 +502,7 @@ public class TestTransaction extends TableTestBase {
 
     // create a manifest append
     OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
-    ManifestWriter writer = ManifestFiles.write(table.spec(), manifestLocation);
+    ManifestWriter<DataFile> writer = ManifestFiles.write(table.spec(), manifestLocation);
     try {
       writer.add(FILE_D);
     } finally {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
index 47c9caf..d0967b2 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
@@ -157,7 +157,7 @@ public class TestSparkDataFile {
     Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
 
     List<DataFile> dataFiles = Lists.newArrayList();
-    try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+    try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
       reader.forEach(dataFile -> dataFiles.add(dataFile.copy()));
     }