You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/06 01:20:10 UTC

[incubator-iceberg] branch master updated: Add manifest listing files (#21)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 54f9a0f  Add manifest listing files (#21)
54f9a0f is described below

commit 54f9a0ffaa0cc69a25818fcdfbc9b8bfc579fe67
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Dec 5 17:20:06 2018 -0800

    Add manifest listing files (#21)
    
    * Add ManifestFile and migrate Snapshot to return it.
    * Optionally write manifest lists to separate files.
        This adds a new table property, write.manifest-lists.enabled, that
        defaults to false. When enabled, new snapshot manifest lists will be
        written into separate files. The file location will be stored in the
        snapshot metadata as "manifest-list".
    * Aggregate partition field summaries when writing manifests.
    * Add InclusiveManifestEvaluator.
        This expression evaluator determines whether a manifest needs to be
        scanned or whether it cannot contain data files matching a partition
        predicate.
    * Add file length to ManifestFile.
    * Ensure files in manifest lists have helpful metadata.
        This modifies SnapshotUpdate when writing a snapshot with a manifest
        list file. If files for the manifest list do not have full metadata,
        then this will scan the manifests to add metadata, including snapshot
        ID, added/existing/deleted count, and partition field summaries.
    * Add partitions name mapping when reading Snapshot manifest list.
    * Update ScanSummary and FileHistory to use ManifestFile metadata.
        This optimizes ScanSummary and FileHistory to ignore manifests that
        cannot have changes in the configured time range.
---
 api/src/main/java/com/netflix/iceberg/Files.java   |   5 +
 .../java/com/netflix/iceberg/ManifestFile.java     | 144 +++++++++
 .../main/java/com/netflix/iceberg/Snapshot.java    |   9 +-
 .../iceberg/expressions/BoundReference.java        |   4 +
 .../expressions/InclusiveManifestEvaluator.java    | 240 ++++++++++++++
 .../com/netflix/iceberg/expressions/Literals.java  |   1 +
 .../java/com/netflix/iceberg/io/OutputFile.java    |   6 +
 .../com/netflix/iceberg/types/Comparators.java     |  31 ++
 .../test/java/com/netflix/iceberg/TestHelpers.java | 101 ++++++
 .../com/netflix/iceberg/TestPartitionPaths.java    |   1 +
 .../TestInclusiveManifestEvaluator.java            | 295 +++++++++++++++++
 .../java/com/netflix/iceberg/BaseSnapshot.java     |  57 +++-
 .../java/com/netflix/iceberg/BaseTableScan.java    |  22 +-
 .../main/java/com/netflix/iceberg/FastAppend.java  |  45 +--
 .../main/java/com/netflix/iceberg/FileHistory.java |  14 +-
 .../com/netflix/iceberg/GenericManifestFile.java   | 311 ++++++++++++++++++
 .../iceberg/GenericPartitionFieldSummary.java      | 191 +++++++++++
 .../java/com/netflix/iceberg/ManifestGroup.java    |  20 +-
 .../com/netflix/iceberg/ManifestListWriter.java    |  77 +++++
 .../java/com/netflix/iceberg/ManifestWriter.java   |  49 ++-
 .../com/netflix/iceberg/MergingSnapshotUpdate.java | 356 +++++++++------------
 .../java/com/netflix/iceberg/OverwriteData.java    |   2 +-
 .../java/com/netflix/iceberg/PartitionSummary.java |  97 ++++++
 .../java/com/netflix/iceberg/RemoveSnapshots.java  |  12 +-
 .../iceberg/ReplacePartitionsOperation.java        |   2 +-
 .../main/java/com/netflix/iceberg/ScanSummary.java |  51 ++-
 .../java/com/netflix/iceberg/SnapshotParser.java   |  39 ++-
 .../java/com/netflix/iceberg/SnapshotUpdate.java   | 125 +++++++-
 .../java/com/netflix/iceberg/TableMetadata.java    |   8 +
 .../java/com/netflix/iceberg/TableProperties.java  |   3 +
 .../main/java/com/netflix/iceberg/avro/Avro.java   |   6 +-
 .../netflix/iceberg/hadoop/HadoopOutputFile.java   |   6 +
 .../com/netflix/iceberg/LocalTableOperations.java  |  80 +++++
 .../java/com/netflix/iceberg/TableTestBase.java    |  31 +-
 .../java/com/netflix/iceberg/TestFastAppend.java   |  22 +-
 .../java/com/netflix/iceberg/TestMergeAppend.java  |  38 +--
 .../java/com/netflix/iceberg/TestReplaceFiles.java |  20 +-
 .../java/com/netflix/iceberg/TestSnapshotJson.java |  57 +++-
 .../com/netflix/iceberg/TestTableMetadataJson.java |  38 ++-
 .../java/com/netflix/iceberg/TestTransaction.java  |  24 +-
 40 files changed, 2274 insertions(+), 366 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index b227199..f739751 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -76,6 +76,11 @@ public class Files {
     }
 
     @Override
+    public InputFile toInputFile() {
+      return localInput(file);
+    }
+
+    @Override
     public String toString() {
       return location();
     }
diff --git a/api/src/main/java/com/netflix/iceberg/ManifestFile.java b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
new file mode 100644
index 0000000..b1d919b
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+/**
+ * Represents a manifest file that can be scanned to find data files in a table.
+ */
+public interface ManifestFile {
+  Schema SCHEMA = new Schema(
+      required(500, "manifest_path", Types.StringType.get()),
+      required(501, "manifest_length", Types.LongType.get()),
+      required(502, "partition_spec_id", Types.IntegerType.get()),
+      optional(503, "added_snapshot_id", Types.LongType.get()),
+      optional(504, "added_data_files_count", Types.IntegerType.get()),
+      optional(505, "existing_data_files_count", Types.IntegerType.get()),
+      optional(506, "deleted_data_files_count", Types.IntegerType.get()),
+      optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of(
+          required(509, "contains_null", Types.BooleanType.get()),
+          optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
+          optional(511, "upper_bound", Types.BinaryType.get())
+      ))));
+
+  static Schema schema() {
+    return SCHEMA;
+  }
+
+  /**
+   * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+   */
+  String path();
+
+  /**
+   * @return length of the manifest file
+   */
+  long length();
+
+  /**
+   * @return ID of the {@link PartitionSpec} used to write the manifest file
+   */
+  int partitionSpecId();
+
+  /**
+   * @return ID of the snapshot that added the manifest file to table metadata
+   */
+  Long snapshotId();
+
+  /**
+   * @return the number of data files with status ADDED in the manifest file
+   */
+  Integer addedFilesCount();
+
+  /**
+   * @return the number of data files with status EXISTING in the manifest file
+   */
+  Integer existingFilesCount();
+
+  /**
+   * @return the number of data files with status DELETED in the manifest file
+   */
+  Integer deletedFilesCount();
+
+  /**
+   * Returns a list of {@link PartitionFieldSummary partition field summaries}.
+   * <p>
+   * Each summary corresponds to a field in the manifest file's partition spec, by ordinal. For
+   * example, the partition spec [ ts_day=date(ts), type=identity(type) ] will have 2 summaries.
+   * The first summary is for the ts_day partition field and the second is for the type partition
+   * field.
+   *
+   * @return a list of partition field summaries, one for each field in the manifest's spec
+   */
+  List<PartitionFieldSummary> partitions();
+
+  /**
+   * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
+   * this method to make defensive copies.
+   *
+   * @return a copy of this manifest file
+   */
+  ManifestFile copy();
+
+  /**
+   * Summarizes the values of one partition field stored in a manifest file.
+   */
+  interface PartitionFieldSummary {
+    Types.StructType TYPE = ManifestFile.schema()
+        .findType("partitions")
+        .asListType()
+        .elementType()
+        .asStructType();
+
+    static Types.StructType getType() {
+      return TYPE;
+    }
+
+    /**
+     * @return true if at least one data file in the manifest has a null value for the field
+     */
+    boolean containsNull();
+
+    /**
+     * @return a ByteBuffer that contains a serialized bound lower than all values of the field
+     */
+    ByteBuffer lowerBound();
+
+    /**
+     * @return a ByteBuffer that contains a serialized bound higher than all values of the field
+     */
+    ByteBuffer upperBound();
+
+    /**
+     * Copies this {@link PartitionFieldSummary summary}. Readers can reuse instances; use this
+     * method to make defensive copies.
+     *
+     * @return a copy of this partition field summary
+     */
+    PartitionFieldSummary copy();
+  }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 7fc878b..89542dc 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -60,7 +60,7 @@ public interface Snapshot {
    *
    * @return a list of fully-qualified manifest locations
    */
-  List<String> manifests();
+  List<ManifestFile> manifests();
 
   /**
    * Return all files added to the table in this snapshot.
@@ -81,4 +81,11 @@ public interface Snapshot {
    * @return all files deleted from the table in this snapshot.
    */
   Iterable<DataFile> deletedFiles();
+
+  /**
+   * Return the location of this snapshot's manifest list, or null if it is not separate.
+   *
+   * @return the location of the manifest list for this Snapshot
+   */
+  String manifestListLocation();
 }
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
index 0106f35..5a83650 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
@@ -55,6 +55,10 @@ public class BoundReference<T> implements Reference {
     return fieldId;
   }
 
+  public int pos() {
+    return pos;
+  }
+
   public T get(StructLike struct) {
     return struct.get(pos, javaType());
   }
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
new file mode 100644
index 0000000..cac617d
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Types.StructType;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.expressions.Expressions.rewriteNot;
+
+/**
+ * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
+ * matching partitions.
+ * <p>
+ * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ * <p>
+ * Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
+ * data files that match the partition expression. Manifest files may be skipped if and only if the
+ * return value of {@code eval} is false.
+ */
+public class InclusiveManifestEvaluator {
+  private final StructType struct;
+  private final Expression expr;
+  private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
+
+  private ManifestEvalVisitor visitor() {
+    if (visitors == null) {
+      this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
+    }
+    return visitors.get();
+  }
+
+  public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
+    this.struct = spec.partitionType();
+    this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)));
+  }
+
+  /**
+   * Test whether the file may contain records that match the expression.
+   *
+   * @param manifest a manifest file
+   * @return false if the file cannot contain rows that match the expression, true otherwise.
+   */
+  public boolean eval(ManifestFile manifest) {
+    return visitor().eval(manifest);
+  }
+
+  private static final boolean ROWS_MIGHT_MATCH = true;
+  private static final boolean ROWS_CANNOT_MATCH = false;
+
+  private class ManifestEvalVisitor extends BoundExpressionVisitor<Boolean> {
+    private List<PartitionFieldSummary> stats = null;
+
+    private boolean eval(ManifestFile manifest) {
+      this.stats = manifest.partitions();
+      if (stats == null) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      return ExpressionVisitors.visit(expr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH; // all rows match
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH; // all rows fail
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      // no need to check whether the field is required because binding evaluates that case
+      // if the column has no null values, the expression cannot match
+      if (!stats.get(ref.pos()).containsNull()) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      // containsNull encodes whether at least one partition value is null, lowerBound is null if
+      // all partition values are null.
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // all values are null
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp >= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      if (upperBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp <= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      if (upperBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      PartitionFieldSummary fieldStats = stats.get(ref.pos());
+      if (fieldStats.lowerBound() == null) {
+        return ROWS_CANNOT_MATCH; // values are all null and literal cannot contain null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), fieldStats.upperBound());
+      cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      // because the bounds are not necessarily a min or max value, this cannot be answered using
+      // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+  }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
index 22ef41c..f4e5d4e 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
@@ -96,6 +96,7 @@ class Literals {
     private final T value;
 
     BaseLiteral(T value) {
+      Preconditions.checkNotNull(value, "Literal values cannot be null");
       this.value = value;
     }
 
diff --git a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
index 9d75805..f0f48ee 100644
--- a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
+++ b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
@@ -58,4 +58,10 @@ public interface OutputFile {
    */
   String location();
 
+  /**
+   * Return an {@link InputFile} for the location of this output file.
+   *
+   * @return an input file for the location of this output file
+   */
+  InputFile toInputFile();
 }
diff --git a/api/src/main/java/com/netflix/iceberg/types/Comparators.java b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
index 9e2ce2d..6680f7d 100644
--- a/api/src/main/java/com/netflix/iceberg/types/Comparators.java
+++ b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
@@ -19,10 +19,41 @@
 
 package com.netflix.iceberg.types;
 
+import com.google.common.collect.ImmutableMap;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
 public class Comparators {
+  private static final ImmutableMap<Type.PrimitiveType, Comparator<?>> COMPARATORS = ImmutableMap
+      .<Type.PrimitiveType, Comparator<?>>builder()
+      .put(Types.BooleanType.get(), Comparator.naturalOrder())
+      .put(Types.IntegerType.get(), Comparator.naturalOrder())
+      .put(Types.LongType.get(), Comparator.naturalOrder())
+      .put(Types.FloatType.get(), Comparator.naturalOrder())
+      .put(Types.DoubleType.get(), Comparator.naturalOrder())
+      .put(Types.DateType.get(), Comparator.naturalOrder())
+      .put(Types.TimeType.get(), Comparator.naturalOrder())
+      .put(Types.TimestampType.withZone(), Comparator.naturalOrder())
+      .put(Types.TimestampType.withoutZone(), Comparator.naturalOrder())
+      .put(Types.StringType.get(), Comparators.charSequences())
+      .put(Types.UUIDType.get(), Comparator.naturalOrder())
+      .put(Types.BinaryType.get(), Comparators.unsignedBytes())
+      .build();
+
+  @SuppressWarnings("unchecked")
+  public static <T> Comparator<T> forType(Type.PrimitiveType type) {
+    Comparator<?> cmp = COMPARATORS.get(type);
+    if (cmp != null) {
+      return (Comparator<T>) cmp;
+    } else if (type instanceof Types.FixedType) {
+      return (Comparator<T>) Comparators.unsignedBytes();
+    } else if (type instanceof Types.DecimalType) {
+      return (Comparator<T>) Comparator.naturalOrder();
+    }
+
+    throw new UnsupportedOperationException("Cannot determine comparator for type: " + type);
+  }
+
   public static Comparator<ByteBuffer> unsignedBytes() {
     return UnsignedByteBufComparator.INSTANCE;
   }
diff --git a/api/src/test/java/com/netflix/iceberg/TestHelpers.java b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
index 118e11b..ceb1eed 100644
--- a/api/src/test/java/com/netflix/iceberg/TestHelpers.java
+++ b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
@@ -176,6 +176,107 @@ public class TestHelpers {
     }
   }
 
+  public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
+    private final boolean containsNull;
+    private final ByteBuffer lowerBound;
+    private final ByteBuffer upperBound;
+
+    public TestFieldSummary(boolean containsNull, ByteBuffer lowerBound, ByteBuffer upperBound) {
+      this.containsNull = containsNull;
+      this.lowerBound = lowerBound;
+      this.upperBound = upperBound;
+    }
+
+    @Override
+    public boolean containsNull() {
+      return containsNull;
+    }
+
+    @Override
+    public ByteBuffer lowerBound() {
+      return lowerBound;
+    }
+
+    @Override
+    public ByteBuffer upperBound() {
+      return upperBound;
+    }
+
+    @Override
+    public ManifestFile.PartitionFieldSummary copy() {
+      return this;
+    }
+  }
+
+  public static class TestManifestFile implements ManifestFile {
+    private final String path;
+    private final long length;
+    private final int specId;
+    private final Long snapshotId;
+    private final Integer addedFiles;
+    private final Integer existingFiles;
+    private final Integer deletedFiles;
+    private final List<PartitionFieldSummary> partitions;
+
+    public TestManifestFile(String path, long length, int specId, Long snapshotId,
+                            Integer addedFiles, Integer existingFiles, Integer deletedFiles,
+                            List<PartitionFieldSummary> partitions) {
+      this.path = path;
+      this.length = length;
+      this.specId = specId;
+      this.snapshotId = snapshotId;
+      this.addedFiles = addedFiles;
+      this.existingFiles = existingFiles;
+      this.deletedFiles = deletedFiles;
+      this.partitions = partitions;
+    }
+
+    @Override
+    public String path() {
+      return path;
+    }
+
+    @Override
+    public long length() {
+      return length;
+    }
+
+    @Override
+    public int partitionSpecId() {
+      return specId;
+    }
+
+    @Override
+    public Long snapshotId() {
+      return snapshotId;
+    }
+
+    @Override
+    public Integer addedFilesCount() {
+      return addedFiles;
+    }
+
+    @Override
+    public Integer existingFilesCount() {
+      return existingFiles;
+    }
+
+    @Override
+    public Integer deletedFilesCount() {
+      return deletedFiles;
+    }
+
+    @Override
+    public List<PartitionFieldSummary> partitions() {
+      return partitions;
+    }
+
+    @Override
+    public ManifestFile copy() {
+      return this;
+    }
+  }
+
   public static class TestDataFile implements DataFile {
     private final String path;
     private final StructLike partition;
diff --git a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
index aab66a3..1253f1e 100644
--- a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
+++ b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
@@ -34,6 +34,7 @@ public class TestPartitionPaths {
   );
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testPartitionPath() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
         .hour("ts")
diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
new file mode 100644
index 0000000..f92f700
--- /dev/null
+++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.TestHelpers;
+import com.netflix.iceberg.exceptions.ValidationException;
+import com.netflix.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static com.netflix.iceberg.expressions.Expressions.and;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.isNull;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.not;
+import static com.netflix.iceberg.expressions.Expressions.notEqual;
+import static com.netflix.iceberg.expressions.Expressions.notNull;
+import static com.netflix.iceberg.expressions.Expressions.or;
+import static com.netflix.iceberg.types.Conversions.toByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestInclusiveManifestEvaluator {
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.IntegerType.get()),
+      optional(4, "all_nulls", Types.StringType.get()),
+      optional(5, "some_nulls", Types.StringType.get()),
+      optional(6, "no_nulls", Types.StringType.get())
+  );
+
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+      .withSpecId(0)
+      .identity("id")
+      .identity("all_nulls")
+      .identity("some_nulls")
+      .identity("no_nulls")
+      .build();
+
+  private static final ByteBuffer INT_MIN = toByteBuffer(Types.IntegerType.get(), 30);
+  private static final ByteBuffer INT_MAX = toByteBuffer(Types.IntegerType.get(), 79);
+
+  private static final ByteBuffer STRING_MIN = toByteBuffer(Types.StringType.get(), "a");
+  private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z");
+
+  private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile(
+      "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null);
+
+  private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro",
+      1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of(
+          new TestHelpers.TestFieldSummary(false, INT_MIN, INT_MAX),
+          new TestHelpers.TestFieldSummary(true, null, null),
+          new TestHelpers.TestFieldSummary(true, STRING_MIN, STRING_MAX),
+          new TestHelpers.TestFieldSummary(false, STRING_MIN, STRING_MAX)));
+
+  @Test
+  public void testAllNulls() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+    Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
+  }
+
+  @Test
+  public void testNoNulls() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+    Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
+  }
+
+  @Test
+  public void testMissingColumn() {
+    TestHelpers.assertThrows("Should complain about missing column in expression",
+        ValidationException.class, "Cannot find field 'missing'",
+        () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+  }
+
+  @Test
+  public void testMissingStats() {
+    Expression[] exprs = new Expression[] {
+        lessThan("id", 5), lessThanOrEqual("id", 30), equal("id", 70),
+        greaterThan("id", 78), greaterThanOrEqual("id", 90), notEqual("id", 101),
+        isNull("id"), notNull("id")
+    };
+
+    for (Expression expr : exprs) {
+      boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+      Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
+    }
+  }
+
+  @Test
+  public void testNot() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+    Assert.assertTrue("Should read: not(false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+    Assert.assertFalse("Should skip: not(true)", shouldRead);
+  }
+
+  @Test
+  public void testAnd() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+    Assert.assertFalse("Should skip: and(false, false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+    Assert.assertTrue("Should read: and(true, true)", shouldRead);
+  }
+
+  @Test
+  public void testOr() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+    Assert.assertFalse("Should skip: or(false, false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+    Assert.assertTrue("Should read: or(false, true)", shouldRead);
+  }
+
+  @Test
+  public void testIntegerLt() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerLtEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: many possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerGt() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerGtEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testIntegerNotEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testIntegerNotEqRewritten() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 5409b9a..945ddbb 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -20,21 +20,25 @@
 package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.CloseableGroup;
+import com.netflix.iceberg.io.CloseableIterable;
+import com.netflix.iceberg.io.InputFile;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-class BaseSnapshot extends CloseableGroup implements Snapshot {
+class BaseSnapshot implements Snapshot {
   private final TableOperations ops;
   private final long snapshotId;
   private final Long parentId;
   private final long timestampMillis;
-  private final List<String> manifestFiles;
+  private final InputFile manifestList;
 
   // lazily initialized
+  private List<ManifestFile> manifests = null;
   private List<DataFile> adds = null;
   private List<DataFile> deletes = null;
 
@@ -44,19 +48,30 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
   BaseSnapshot(TableOperations ops,
                long snapshotId,
                String... manifestFiles) {
-    this(ops, snapshotId, null, System.currentTimeMillis(), Arrays.asList(manifestFiles));
+    this(ops, snapshotId, null, System.currentTimeMillis(),
+        Lists.transform(Arrays.asList(manifestFiles),
+            path -> new GenericManifestFile(ops.newInputFile(path), 0)));
   }
 
   BaseSnapshot(TableOperations ops,
                long snapshotId,
                Long parentId,
                long timestampMillis,
-               List<String> manifestFiles) {
+               InputFile manifestList) {
     this.ops = ops;
     this.snapshotId = snapshotId;
     this.parentId = parentId;
     this.timestampMillis = timestampMillis;
-    this.manifestFiles = manifestFiles;
+    this.manifestList = manifestList;
+  }
+
+  BaseSnapshot(TableOperations ops,
+               long snapshotId,
+               Long parentId,
+               long timestampMillis,
+               List<ManifestFile> manifests) {
+    this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+    this.manifests = manifests;
   }
 
   @Override
@@ -75,8 +90,25 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
   }
 
   @Override
-  public List<String> manifests() {
-    return manifestFiles;
+  public List<ManifestFile> manifests() {
+    if (manifests == null) {
+      // if manifests isn't set, then the snapshotFile is set and should be read to get the list
+      try (CloseableIterable<ManifestFile> files = Avro.read(manifestList)
+          .rename("manifest_file", GenericManifestFile.class.getName())
+          .rename("partitions", GenericPartitionFieldSummary.class.getName())
+          .rename("r508", GenericPartitionFieldSummary.class.getName())
+          .project(ManifestFile.schema())
+          .reuseContainers(false)
+          .build()) {
+
+        this.manifests = Lists.newLinkedList(files);
+
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Cannot read snapshot file: %s", manifestList.location());
+      }
+    }
+
+    return manifests;
   }
 
   @Override
@@ -95,13 +127,18 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
     return deletes;
   }
 
+  @Override
+  public String manifestListLocation() {
+    return manifestList != null ? manifestList.location() : null;
+  }
+
   private void cacheChanges() {
     List<DataFile> adds = Lists.newArrayList();
     List<DataFile> deletes = Lists.newArrayList();
 
     // accumulate adds and deletes from all manifests.
     // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
-    for (String manifest : manifestFiles) {
+    for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
       try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
@@ -127,7 +164,7 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
     return Objects.toStringHelper(this)
         .add("id", snapshotId)
         .add("timestamp_ms", timestampMillis)
-        .add("manifests", manifestFiles)
+        .add("manifests", manifests)
         .toString();
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index e99889e..ad20780 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -21,6 +21,9 @@ package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -30,6 +33,8 @@ import com.netflix.iceberg.events.ScanEvent;
 import com.netflix.iceberg.expressions.Binder;
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
+import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.ResidualEvaluator;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.TypeUtil;
@@ -141,6 +146,16 @@ class BaseTableScan implements TableScan {
     return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr));
   }
 
+  private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
+        @Override
+        public InclusiveManifestEvaluator load(Integer specId) {
+          PartitionSpec spec = ops.current().spec(specId);
+          return new InclusiveManifestEvaluator(spec, rowFilter);
+        }
+      });
+
   @Override
   public CloseableIterable<FileScanTask> planFiles() {
     Snapshot snapshot = snapshotId != null ?
@@ -155,11 +170,14 @@ class BaseTableScan implements TableScan {
       Listeners.notifyAll(
           new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema));
 
+      Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
+          manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+
       ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
       Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
-          snapshot.manifests(),
+          matchingManifests,
           manifest -> {
-            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
             toClose.add(reader);
             String schemaString = SchemaParser.toJson(reader.spec().schema());
             String specString = PartitionSpecParser.toJson(reader.spec());
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index de34545..278f059 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -35,7 +35,7 @@ import java.util.Set;
 class FastAppend extends SnapshotUpdate implements AppendFiles {
   private final PartitionSpec spec;
   private final List<DataFile> newFiles = Lists.newArrayList();
-  private String newManifestLocation = null;
+  private ManifestFile newManifest = null;
   private boolean hasNewFiles = false;
 
   FastAppend(TableOperations ops) {
@@ -51,11 +51,15 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
-    String location = writeManifest();
+  public List<ManifestFile> apply(TableMetadata base) {
+    List<ManifestFile> newManifests = Lists.newArrayList();
+
+    try {
+      newManifests.add(writeManifest());
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write manifest");
+    }
 
-    List<String> newManifests = Lists.newArrayList();
-    newManifests.add(location);
     if (base.currentSnapshot() != null) {
       newManifests.addAll(base.currentSnapshot().manifests());
     }
@@ -64,33 +68,32 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
   }
 
   @Override
-  protected void cleanUncommitted(Set<String> committed) {
-    if (!committed.contains(newManifestLocation)) {
-      deleteFile(newManifestLocation);
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
+    if (!committed.contains(newManifest)) {
+      deleteFile(newManifest.path());
     }
   }
 
-  private String writeManifest() {
-    if (hasNewFiles && newManifestLocation != null) {
-      deleteFile(newManifestLocation);
-      hasNewFiles = false;
-      newManifestLocation = null;
+  private ManifestFile writeManifest() throws IOException {
+    if (hasNewFiles && newManifest != null) {
+      deleteFile(newManifest.path());
+      newManifest = null;
     }
 
-    if (newManifestLocation == null) {
+    if (newManifest == null) {
       OutputFile out = manifestPath(0);
 
-      try (ManifestWriter writer = new ManifestWriter(spec, out, snapshotId())) {
-
+      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      try {
         writer.addAll(newFiles);
-
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+      } finally {
+        writer.close();
       }
 
-      this.newManifestLocation = out.location();
+      this.newManifest = writer.toManifestFile();
+      hasNewFiles = false;
     }
 
-    return newManifestLocation;
+    return newManifest;
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/FileHistory.java b/core/src/main/java/com/netflix/iceberg/FileHistory.java
index 3ed8006..60146b0 100644
--- a/core/src/main/java/com/netflix/iceberg/FileHistory.java
+++ b/core/src/main/java/com/netflix/iceberg/FileHistory.java
@@ -32,6 +32,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
+
 public class FileHistory {
   private static final List<String> HISTORY_COLUMNS = ImmutableList.of("file_path");
 
@@ -91,12 +94,17 @@ public class FileHistory {
         snapshots = Iterables.filter(snapshots, snap -> snap.timestampMillis() <= endTime);
       }
 
+      // only use manifests that were added in the matching snapshots
+      Set<Long> matchingIds = Sets.newHashSet(transform(snapshots, snap -> snap.snapshotId()));
+      Iterable<ManifestFile> manifests = Iterables.filter(
+          concat(transform(snapshots, Snapshot::manifests)),
+          manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));
+
       // a manifest group will only read each manifest once
-      ManifestGroup manifests = new ManifestGroup(((HasTableOperations) table).operations(),
-          Iterables.concat(Iterables.transform(snapshots, Snapshot::manifests)));
+      ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations(), manifests);
 
       List<ManifestEntry> results = Lists.newArrayList();
-      try (CloseableIterable<ManifestEntry> entries = manifests.select(HISTORY_COLUMNS).entries()) {
+      try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
         // TODO: replace this with an IN predicate
         CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
         for (ManifestEntry entry : entries) {
diff --git a/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
new file mode 100644
index 0000000..628515b
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.Iterables.transform;
+
+public class GenericManifestFile
+    implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+  private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(
+      ManifestFile.schema(), "manifest_file");
+
+  private transient Schema avroSchema; // not final for Java serialization
+  private int[] fromProjectionPos;
+
+  // data fields
+  private InputFile file = null;
+  private String manifestPath = null;
+  private Long length = null;
+  private int specId = -1;
+  private Long snapshotId = null;
+  private Integer addedFilesCount = null;
+  private Integer existingFilesCount = null;
+  private Integer deletedFilesCount = null;
+  private List<PartitionFieldSummary> partitions = null;
+
+  /**
+   * Used by Avro reflection to instantiate this class when reading manifest files.
+   */
+  public GenericManifestFile(org.apache.avro.Schema avroSchema) {
+    this.avroSchema = avroSchema;
+
+    List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+        .asNestedType()
+        .asStructType()
+        .fields();
+    List<Types.NestedField> allFields = ManifestFile.schema().asStruct().fields();
+
+    this.fromProjectionPos = new int[fields.size()];
+    for (int i = 0; i < fromProjectionPos.length; i += 1) {
+      boolean found = false;
+      for (int j = 0; j < allFields.size(); j += 1) {
+        if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+          found = true;
+          fromProjectionPos[i] = j;
+        }
+      }
+
+      if (!found) {
+        throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+      }
+    }
+  }
+
+  GenericManifestFile(InputFile file, int specId) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.file = file;
+    this.manifestPath = file.location();
+    this.length = null; // lazily loaded from file
+    this.specId = specId;
+    this.snapshotId = null;
+    this.addedFilesCount = null;
+    this.existingFilesCount = null;
+    this.deletedFilesCount = null;
+    this.partitions = null;
+    this.fromProjectionPos = null;
+  }
+
+  public GenericManifestFile(String path, long length, int specId, long snapshotId,
+                             int addedFilesCount, int existingFilesCount, int deletedFilesCount,
+                             List<PartitionFieldSummary> partitions) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.manifestPath = path;
+    this.length = length;
+    this.specId = specId;
+    this.snapshotId = snapshotId;
+    this.addedFilesCount = addedFilesCount;
+    this.existingFilesCount = existingFilesCount;
+    this.deletedFilesCount = deletedFilesCount;
+    this.partitions = partitions;
+    this.fromProjectionPos = null;
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param toCopy a generic manifest file to copy.
+   */
+  private GenericManifestFile(GenericManifestFile toCopy) {
+    this.avroSchema = toCopy.avroSchema;
+    this.manifestPath = toCopy.manifestPath;
+    this.length = toCopy.length;
+    this.specId = toCopy.specId;
+    this.snapshotId = toCopy.snapshotId;
+    this.addedFilesCount = toCopy.addedFilesCount;
+    this.existingFilesCount = toCopy.existingFilesCount;
+    this.deletedFilesCount = toCopy.deletedFilesCount;
+    this.partitions = copyOf(transform(toCopy.partitions, PartitionFieldSummary::copy));
+    this.fromProjectionPos = toCopy.fromProjectionPos;
+  }
+
+  /**
+   * Constructor for Java serialization.
+   */
+  GenericManifestFile() {
+  }
+
+  @Override
+  public String path() {
+    return manifestPath;
+  }
+
+  public Long lazyLength() {
+    if (length == null) {
+      if (file != null) {
+        // this was created from an input file and length is lazily loaded
+        this.length = file.getLength();
+      } else {
+        // this was loaded from a file without projecting length, throw an exception
+        return null;
+      }
+    }
+    return length;
+  }
+
+  @Override
+  public long length() {
+    return lazyLength();
+  }
+
+  @Override
+  public int partitionSpecId() {
+    return specId;
+  }
+
+  @Override
+  public Long snapshotId() {
+    return snapshotId;
+  }
+
+  @Override
+  public Integer addedFilesCount() {
+    return addedFilesCount;
+  }
+
+  @Override
+  public Integer existingFilesCount() {
+    return existingFilesCount;
+  }
+
+  @Override
+  public Integer deletedFilesCount() {
+    return deletedFilesCount;
+  }
+
+  @Override
+  public List<PartitionFieldSummary> partitions() {
+    return partitions;
+  }
+
+  @Override
+  public int size() {
+    return ManifestFile.schema().columns().size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(get(pos));
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    set(i, v);
+  }
+
+  @Override
+  public Object get(int i) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        return manifestPath;
+      case 1:
+        return lazyLength();
+      case 2:
+        return specId;
+      case 3:
+        return snapshotId;
+      case 4:
+        return addedFilesCount;
+      case 5:
+        return existingFilesCount;
+      case 6:
+        return deletedFilesCount;
+      case 7:
+        return partitions;
+      default:
+        throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> void set(int i, T value) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        // always coerce to String for Serializable
+        this.manifestPath = value.toString();
+        return;
+      case 1:
+        this.length = (Long) value;
+        return;
+      case 2:
+        this.specId = (Integer) value;
+        return;
+      case 3:
+        this.snapshotId = (Long) value;
+        return;
+      case 4:
+        this.addedFilesCount = (Integer) value;
+        return;
+      case 5:
+        this.existingFilesCount = (Integer) value;
+        return;
+      case 6:
+        this.deletedFilesCount = (Integer) value;
+        return;
+      case 7:
+        this.partitions = (List<PartitionFieldSummary>) value;
+        return;
+      default:
+        // ignore the object, it must be from a newer version of the format
+    }
+  }
+
+  @Override
+  public ManifestFile copy() {
+    return new GenericManifestFile(this);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return avroSchema;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    GenericManifestFile that = (GenericManifestFile) other;
+    return Objects.equal(manifestPath, that.manifestPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(manifestPath);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("path", manifestPath)
+        .add("length", length)
+        .add("partition_spec_id", specId)
+        .add("added_snapshot_id", snapshotId)
+        .add("added_data_files_count", addedFilesCount)
+        .add("existing_data_files_count", existingFilesCount)
+        .add("deleted_data_files_count", deletedFilesCount)
+        .add("partitions", partitions)
+        .toString();
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
new file mode 100644
index 0000000..0c57cb3
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class GenericPartitionFieldSummary
+    implements PartitionFieldSummary, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+  private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(PartitionFieldSummary.getType());
+
+  private transient Schema avroSchema; // not final for Java serialization
+  private int[] fromProjectionPos;
+
+  // data fields
+  private boolean containsNull = false;
+  private ByteBuffer lowerBound = null;
+  private ByteBuffer upperBound = null;
+
+  /**
+   * Used by Avro reflection to instantiate this class when reading manifest files.
+   */
+  public GenericPartitionFieldSummary(Schema avroSchema) {
+    this.avroSchema = avroSchema;
+
+    List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+        .asNestedType()
+        .asStructType()
+        .fields();
+    List<Types.NestedField> allFields = PartitionFieldSummary.getType().fields();
+
+    this.fromProjectionPos = new int[fields.size()];
+    for (int i = 0; i < fromProjectionPos.length; i += 1) {
+      boolean found = false;
+      for (int j = 0; j < allFields.size(); j += 1) {
+        if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+          found = true;
+          fromProjectionPos[i] = j;
+        }
+      }
+
+      if (!found) {
+        throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+      }
+    }
+  }
+
+  public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound,
+                                      ByteBuffer upperBound) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.containsNull = containsNull;
+    this.lowerBound = lowerBound;
+    this.upperBound = upperBound;
+    this.fromProjectionPos = null;
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param toCopy a generic manifest file to copy.
+   */
+  private GenericPartitionFieldSummary(GenericPartitionFieldSummary toCopy) {
+    this.avroSchema = toCopy.avroSchema;
+    this.containsNull = toCopy.containsNull;
+    this.lowerBound = toCopy.lowerBound;
+    this.upperBound = toCopy.upperBound;
+    this.fromProjectionPos = toCopy.fromProjectionPos;
+  }
+
+  /**
+   * Constructor for Java serialization.
+   */
+  GenericPartitionFieldSummary() {
+  }
+
+  @Override
+  public boolean containsNull() {
+    return containsNull;
+  }
+
+  @Override
+  public ByteBuffer lowerBound() {
+    return lowerBound;
+  }
+
+  @Override
+  public ByteBuffer upperBound() {
+    return upperBound;
+  }
+
+  @Override
+  public int size() {
+    return PartitionFieldSummary.getType().fields().size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(get(pos));
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    set(i, v);
+  }
+
+  @Override
+  public Object get(int i) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        return containsNull;
+      case 1:
+        return lowerBound;
+      case 2:
+        return upperBound;
+      default:
+        throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> void set(int i, T value) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        this.containsNull = (Boolean) value;
+        return;
+      case 1:
+        this.lowerBound = (ByteBuffer) value;
+        return;
+      case 2:
+        this.upperBound = (ByteBuffer) value;
+        return;
+      default:
+        // ignore the object, it must be from a newer version of the format
+    }
+  }
+
+  @Override
+  public PartitionFieldSummary copy() {
+    return new GenericPartitionFieldSummary(this);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return avroSchema;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("contains_null", containsNull)
+        .add("lower_bound", lowerBound)
+        .add("upper_bound", upperBound)
+        .toString();
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index f343427..19d993f 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -37,18 +37,18 @@ class ManifestGroup {
   private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
 
   private final TableOperations ops;
-  private final Set<String> manifests;
+  private final Set<ManifestFile> manifests;
   private final Expression dataFilter;
   private final Expression fileFilter;
   private final boolean ignoreDeleted;
   private final List<String> columns;
 
-  ManifestGroup(TableOperations ops, Iterable<String> manifests) {
+  ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
     this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
         false, ImmutableList.of("*"));
   }
 
-  private ManifestGroup(TableOperations ops, Set<String> manifests,
+  private ManifestGroup(TableOperations ops, Set<ManifestFile> manifests,
                         Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
                         List<String> columns) {
     this.ops = ops;
@@ -94,10 +94,20 @@ class ManifestGroup {
     Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
     List<Closeable> toClose = Lists.newArrayList();
 
+    Iterable<ManifestFile> matchingManifests = manifests;
+
+    if (ignoreDeleted) {
+      // remove any manifests that don't have any existing or added files. if either the added or
+      // existing files count is missing, the manifest must be scanned.
+      matchingManifests = Iterables.filter(manifests, manifest ->
+          manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
+              manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+    }
+
     Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
-        manifests,
+        matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
           toClose.add(reader);
           return Iterables.filter(
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
new file mode 100644
index 0000000..98cdbbf
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.io.OutputFile;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+class ManifestListWriter implements FileAppender<ManifestFile> {
+  private final FileAppender<ManifestFile> writer;
+
+  ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+    this.writer = newAppender(snapshotFile, ImmutableMap.of(
+        "snapshot-id", String.valueOf(snapshotId),
+        "parent-snapshot-id", String.valueOf(parentSnapshotId)));
+  }
+
+  @Override
+  public void add(ManifestFile file) {
+    writer.add(file);
+  }
+
+  @Override
+  public void addAll(Iterator<ManifestFile> values) {
+    writer.addAll(values);
+  }
+
+  @Override
+  public void addAll(Iterable<ManifestFile> values) {
+    writer.addAll(values);
+  }
+
+  @Override
+  public Metrics metrics() {
+    return writer.metrics();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+
+  private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
+    try {
+      return Avro.write(file)
+          .schema(ManifestFile.schema())
+          .named("manifest_file")
+          .meta(meta)
+          .build();
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index 28ba831..a59c100 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.FileAppender;
@@ -35,14 +36,27 @@ import static com.netflix.iceberg.ManifestEntry.Status.DELETED;
 class ManifestWriter implements FileAppender<DataFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
 
+  private final String location;
+  private final OutputFile file;
+  private final int specId;
   private final FileAppender<ManifestEntry> writer;
   private final long snapshotId;
-  private ManifestEntry reused = null;
+  private final ManifestEntry reused;
+  private final PartitionSummary stats;
+
+  private boolean closed = false;
+  private int addedFiles = 0;
+  private int existingFiles = 0;
+  private int deletedFiles = 0;
 
   ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
+    this.location = file.location();
+    this.file = file;
+    this.specId = spec.specId();
     this.writer = newAppender(FileFormat.AVRO, spec, file);
     this.snapshotId = snapshotId;
     this.reused = new ManifestEntry(spec.partitionType());
+    this.stats = new PartitionSummary(spec);
   }
 
   public void addExisting(Iterable<ManifestEntry> entries) {
@@ -54,25 +68,37 @@ class ManifestWriter implements FileAppender<DataFile> {
   }
 
   public void addExisting(ManifestEntry entry) {
-    writer.add(reused.wrapExisting(entry.snapshotId(), entry.file()));
+    add(reused.wrapExisting(entry.snapshotId(), entry.file()));
   }
 
   public void addExisting(long snapshotId, DataFile file) {
-    writer.add(reused.wrapExisting(snapshotId, file));
+    add(reused.wrapExisting(snapshotId, file));
   }
 
   public void delete(ManifestEntry entry) {
     // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
     // when this Snapshot has been removed or when there are no Snapshots older than this one.
-    writer.add(reused.wrapDelete(snapshotId, entry.file()));
+    add(reused.wrapDelete(snapshotId, entry.file()));
   }
 
   public void delete(DataFile file) {
-    writer.add(reused.wrapDelete(snapshotId, file));
+    add(reused.wrapDelete(snapshotId, file));
   }
 
-  public void add(ManifestEntry file) {
-    writer.add(file);
+  public void add(ManifestEntry entry) {
+    switch (entry.status()) {
+      case ADDED:
+        addedFiles += 1;
+        break;
+      case EXISTING:
+        existingFiles += 1;
+        break;
+      case DELETED:
+        deletedFiles += 1;
+        break;
+    }
+    stats.update(entry.file().partition());
+    writer.add(entry);
   }
 
   public void addEntries(Iterable<ManifestEntry> entries) {
@@ -85,7 +111,7 @@ class ManifestWriter implements FileAppender<DataFile> {
   public void add(DataFile file) {
     // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
     // Eventually, this should check in case there are other DataFile implementations.
-    writer.add(reused.wrapAppend(snapshotId, file));
+    add(reused.wrapAppend(snapshotId, file));
   }
 
   @Override
@@ -93,8 +119,15 @@ class ManifestWriter implements FileAppender<DataFile> {
     return writer.metrics();
   }
 
+  public ManifestFile toManifestFile() {
+    Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
+    return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+        addedFiles, existingFiles, deletedFiles, stats.summaries());
+  }
+
   @Override
   public void close() throws IOException {
+    this.closed = true;
     writer.close();
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index f12ba04..8878d4c 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.Closeables;
 import com.netflix.iceberg.ManifestEntry.Status;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
@@ -34,7 +33,6 @@ import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
 import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.StrictMetricsEvaluator;
-import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.BinPacking.ListPacker;
 import com.netflix.iceberg.util.CharSequenceWrapper;
@@ -42,15 +40,12 @@ import com.netflix.iceberg.util.StructLikeWrapper;
 import com.netflix.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.collect.Iterables.filter;
@@ -64,7 +59,6 @@ import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   private final Logger LOG = LoggerFactory.getLogger(getClass());
 
-  private static final long SIZE_PER_FILE = 100; // assume each file will be ~100 bytes
   private static final Joiner COMMA = Joiner.on(",");
 
   protected static class DeleteException extends ValidationException {
@@ -94,14 +88,18 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   private boolean failAnyDelete = false;
   private boolean failMissingDeletePaths = false;
 
+  // cache the new manifest once it is written
+  private ManifestFile newManifest = null;
+  private boolean hasNewFiles = false;
+
   // cache merge results to reuse when retrying
-  private final Map<List<String>, String> mergeManifests = Maps.newConcurrentMap();
+  private final Map<List<ManifestFile>, ManifestFile> mergeManifests = Maps.newConcurrentMap();
 
   // cache filtered manifests to avoid extra work when commits fail.
-  private final Map<String, ManifestReader> filteredManifests = Maps.newConcurrentMap();
+  private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
 
   // tracking where files were deleted to validate retries quickly
-  private final Map<String, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
+  private final Map<ManifestFile, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
       Maps.newConcurrentMap();
 
   private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -169,77 +167,70 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
    * Add a file to the new snapshot.
    */
   protected void add(DataFile file) {
+    hasNewFiles = true;
     newFiles.add(file);
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (filterUpdated) {
-      cleanUncommittedFilters(EMPTY_SET);
+      cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
       this.filterUpdated = false;
     }
 
     Snapshot current = base.currentSnapshot();
-    List<PartitionSpec> specs = Lists.newArrayList();
-    List<List<ManifestReader>> groups = Lists.newArrayList();
+    Map<Integer, List<ManifestFile>> groups = Maps.newTreeMap(Comparator.<Integer>reverseOrder());
 
     // use a common metrics evaluator for all manifests because it is bound to the table schema
     StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
         ops.current().schema(), deleteExpression);
 
     // add the current spec as the first group. files are added to the beginning.
-    if (newFiles.size() > 0) {
-      specs.add(spec);
-      groups.add(Lists.newArrayList());
-      groups.get(0).add(newFilesAsManifest());
-    }
-
-    ConcurrentLinkedQueue<ManifestReader> toClose = new ConcurrentLinkedQueue<>();
-    boolean threw = true;
     try {
+      if (newFiles.size() > 0) {
+        ManifestFile newManifest = newFilesAsManifest();
+        List<ManifestFile> manifestGroup = Lists.newArrayList();
+        manifestGroup.add(newManifest);
+        groups.put(newManifest.partitionSpecId(), manifestGroup);
+      }
+
       Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
 
       // group manifests by compatible partition specs to be merged
       if (current != null) {
-        List<String> manifests = current.manifests();
-        ManifestReader[] readers = new ManifestReader[manifests.size()];
+        List<ManifestFile> manifests = current.manifests();
+        ManifestFile[] filtered = new ManifestFile[manifests.size()];
         // open all of the manifest files in parallel, use index to avoid reordering
-        Tasks.range(readers.length)
+        Tasks.range(filtered.length)
             .stopOnFailure().throwFailureWhenFinished()
             .executeWith(getWorkerPool())
             .run(index -> {
-              ManifestReader manifest = filterManifest(
-                  deleteExpression, metricsEvaluator, ops.newInputFile(manifests.get(index)));
-              readers[index] = manifest;
-              toClose.add(manifest);
-            });
-
-        for (ManifestReader reader : readers) {
-          if (reader.file() != null) {
-            String location = reader.file().location();
-            Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(location);
-            if (manifestDeletes != null) {
-              deletedFiles.addAll(manifestDeletes);
-            }
+              ManifestFile manifest = filterManifest(
+                  deleteExpression, metricsEvaluator,
+                  manifests.get(index));
+              filtered[index] = manifest;
+            }, IOException.class);
+
+        for (ManifestFile manifest : filtered) {
+          Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+          if (manifestDeletes != null) {
+            deletedFiles.addAll(manifestDeletes);
           }
 
-          int index = findMatch(specs, reader.spec());
-          if (index < 0) {
-            // not found, add a new one
-            List<ManifestReader> newList = Lists.<ManifestReader>newArrayList(reader);
-            specs.add(reader.spec());
-            groups.add(newList);
+          List<ManifestFile> group = groups.get(manifest.partitionSpecId());
+          if (group != null) {
+            group.add(manifest);
           } else {
-            // replace the reader spec with the later one
-            specs.set(index, reader.spec());
-            groups.get(index).add(reader);
+            group = Lists.newArrayList();
+            group.add(manifest);
+            groups.put(manifest.partitionSpecId(), group);
           }
         }
       }
 
-      List<String> manifests = Lists.newArrayList();
-      for (int i = 0; i < specs.size(); i += 1) {
-        for (String manifest : mergeGroup(specs.get(i), groups.get(i))) {
+      List<ManifestFile> manifests = Lists.newArrayList();
+      for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
+        for (ManifestFile manifest : mergeGroup(entry.getKey(), entry.getValue())) {
           manifests.add(manifest);
         }
       }
@@ -250,52 +241,56 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
               path -> !deletedFiles.contains(path)),
               CharSequenceWrapper::get)));
 
-      threw = false;
-
       return manifests;
 
-    } finally {
-      for (ManifestReader reader : toClose) {
-        try {
-          Closeables.close(reader, threw);
-        } catch (IOException e) {
-          throw new RuntimeIOException(e);
-        }
-      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
     }
   }
 
-  private void cleanUncommittedMerges(Set<String> committed) {
-    List<Map.Entry<List<String>, String>> entries = Lists.newArrayList(mergeManifests.entrySet());
-    for (Map.Entry<List<String>, String> entry : entries) {
+  private void cleanUncommittedMerges(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergeManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
       // delete any new merged manifests that aren't in the committed list
-      String merged = entry.getValue();
+      ManifestFile merged = entry.getValue();
       if (!committed.contains(merged)) {
-        deleteFile(merged);
+        deleteFile(merged.path());
         // remove the deleted file from the cache
         mergeManifests.remove(entry.getKey());
       }
     }
   }
 
-  private void cleanUncommittedFilters(Set<String> committed) {
-    List<Map.Entry<String, ManifestReader>> filterEntries = Lists.newArrayList(filteredManifests.entrySet());
-    for (Map.Entry<String, ManifestReader> entry : filterEntries) {
+  private void cleanUncommittedFilters(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
+        Lists.newArrayList(filteredManifests.entrySet());
+
+    for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
       // remove any new filtered manifests that aren't in the committed list
-      String manifest = entry.getKey();
-      ManifestReader filtered = entry.getValue();
-      if (filtered != null) {
-        String location = filtered.file().location();
-        if (!manifest.equals(location) && !committed.contains(location)) {
-          filteredManifests.remove(manifest);
-          deleteFile(location);
+      ManifestFile manifest = entry.getKey();
+      ManifestFile filtered = entry.getValue();
+      if (!committed.contains(filtered)) {
+        // only delete if the filtered copy was created
+        if (!manifest.equals(filtered)) {
+          deleteFile(filtered.path());
         }
+
+        // remove the entry from the cache
+        filteredManifests.remove(manifest);
       }
     }
   }
 
   @Override
-  protected void cleanUncommitted(Set<String> committed) {
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
+    if (!committed.contains(newManifest)) {
+      deleteFile(newManifest.path());
+      this.newManifest = null;
+    }
     cleanUncommittedMerges(committed);
     cleanUncommittedFilters(committed);
   }
@@ -308,22 +303,20 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   /**
    * @return a ManifestReader that is a filtered version of the input manifest.
    */
-  private ManifestReader filterManifest(Expression deleteExpression,
+  private ManifestFile filterManifest(Expression deleteExpression,
                                         StrictMetricsEvaluator metricsEvaluator,
-                                        InputFile manifest) {
-    ManifestReader cached = filteredManifests.get(manifest.location());
+                                        ManifestFile manifest) throws IOException {
+    ManifestFile cached = filteredManifests.get(manifest);
     if (cached != null) {
       return cached;
     }
 
-    ManifestReader reader = ManifestReader.read(manifest);
-
     if (nothingToFilter()) {
-      filteredManifests.put(manifest.location(), reader);
-      return reader;
+      filteredManifests.put(manifest, manifest);
+      return manifest;
     }
 
-    try {
+    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
       Expression inclusiveExpr = Projections
           .inclusive(reader.spec())
           .project(deleteExpression);
@@ -344,42 +337,35 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
       // manifest without copying data. if a manifest does have a file to remove, this will break
       // out of the loop and move on to filtering the manifest.
       boolean hasDeletedFiles = false;
-      Iterator<ManifestEntry> entries = reader.entries().iterator();
-      try {
-        while (entries.hasNext()) {
-          ManifestEntry entry = entries.next();
-          DataFile file = entry.file();
-          boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
-              dropPartitions.contains(partitionWrapper.set(file.partition())));
-          if (fileDelete || inclusive.eval(file.partition())) {
-            ValidationException.check(
-                fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
-                "Cannot delete file where some, but not all, rows match filter %s: %s",
-                this.deleteExpression, file.path());
-
-            hasDeletedFiles = true;
-            if (failAnyDelete) {
-              throw new DeleteException(writeSpec().partitionToPath(file.partition()));
-            }
-            break; // as soon as a deleted file is detected, stop scanning
+      for (ManifestEntry entry : reader.entries()) {
+        DataFile file = entry.file();
+        boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
+            dropPartitions.contains(partitionWrapper.set(file.partition())));
+        if (fileDelete || inclusive.eval(file.partition())) {
+          ValidationException.check(
+              fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+              "Cannot delete file where some, but not all, rows match filter %s: %s",
+              this.deleteExpression, file.path());
+
+          hasDeletedFiles = true;
+          if (failAnyDelete) {
+            throw new DeleteException(writeSpec().partitionToPath(file.partition()));
           }
-        }
-      } finally {
-        // the loop may have exited early. ensure the iterator is closed.
-        if (entries instanceof Closeable) {
-          ((Closeable) entries).close();
+          break; // as soon as a deleted file is detected, stop scanning
         }
       }
 
       if (!hasDeletedFiles) {
-        return reader;
+        filteredManifests.put(manifest, manifest);
+        return manifest;
       }
 
       // when this point is reached, there is at least one file that will be deleted in the
       // manifest. produce a copy of the manifest with all deleted files removed.
       Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
       OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
-      try (ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId())) {
+      ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+      try {
         for (ManifestEntry entry : reader.entries()) {
           DataFile file = entry.file();
           boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
@@ -396,7 +382,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
               CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
               if (deletedPaths.contains(wrapper)) {
                 LOG.warn("Deleting a duplicate path from manifest {}: {}",
-                    manifest.location(), wrapper.get());
+                    manifest.path(), wrapper.get());
               }
               deletedPaths.add(wrapper);
 
@@ -405,96 +391,79 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
             }
           }
         }
+      } finally {
+        writer.close();
       }
 
-      // close the reader now that it is no longer used and will not be returned
-      reader.close();
-
       // return the filtered manifest as a reader
-      ManifestReader filtered = ManifestReader.read(ops.newInputFile(filteredCopy.location()));
+      ManifestFile filtered = writer.toManifestFile();
 
       // update caches
-      filteredManifests.put(manifest.location(), filtered);
-      filteredManifestToDeletedFiles.put(filteredCopy.location(), deletedPaths);
+      filteredManifests.put(manifest, filtered);
+      filteredManifestToDeletedFiles.put(filtered, deletedPaths);
 
       return filtered;
-
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to filter manifest: %s", reader.file().location());
     }
   }
 
   @SuppressWarnings("unchecked")
-  private Iterable<String> mergeGroup(PartitionSpec groupSpec, List<ManifestReader> group) {
+  private Iterable<ManifestFile> mergeGroup(int specId, List<ManifestFile> group)
+      throws IOException {
     // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
     // from the end so that the manifest that gets under-filled is the first one, which will be
     // merged the next time.
-    long newFilesSize = newFiles.size() * SIZE_PER_FILE;
-    ListPacker<ManifestReader> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
-    List<List<ManifestReader>> bins = packer.packEnd(group,
-        reader -> reader.file() != null ? reader.file().getLength() : newFilesSize);
+    ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+    List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
 
     // process bins in parallel, but put results in the order of the bins into an array to preserve
     // the order of manifests and contents. preserving the order helps avoid random deletes when
     // data files are eventually aged off.
-    List<String>[] binResults = (List<String>[]) Array.newInstance(List.class, bins.size());
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
     Tasks.range(bins.size())
         .stopOnFailure().throwFailureWhenFinished()
         .executeWith(getWorkerPool())
         .run(index -> {
-          List<ManifestReader> bin = bins.get(index);
-          List<String> outputManifests = Lists.newArrayList();
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
           binResults[index] = outputManifests;
 
-          if (bin.size() == 1 && bin.get(0).file() != null) {
+          if (bin.size() == 1) {
             // no need to rewrite
-            outputManifests.add(bin.get(0).file().location());
+            outputManifests.add(bin.get(0));
             return;
           }
 
-          boolean hasInMemoryManifest = false;
-          for (ManifestReader reader : bin) {
-            if (reader.file() == null) {
-              hasInMemoryManifest = true;
-            }
-          }
-
-          // if the bin has an in-memory manifest (the new data) then only merge it if the number of
+          // if the bin has a new manifest (the new data files) then only merge it if the number of
           // manifests is above the minimum count. this is applied only to bins with an in-memory
           // manifest so that large manifests don't prevent merging older groups.
-          if (hasInMemoryManifest && bin.size() < minManifestsCountToMerge) {
-            for (ManifestReader reader : bin) {
-              if (reader.file() != null) {
-                outputManifests.add(reader.file().location());
-              } else {
-                // write the in-memory manifest
-                outputManifests.add(createManifest(groupSpec, Collections.singletonList(reader)));
-              }
-            }
+          if (bin.contains(newManifest) && bin.size() < minManifestsCountToMerge) {
+            // not enough to merge, add all manifest files to the output list
+            outputManifests.addAll(bin);
           } else {
-            outputManifests.add(createManifest(groupSpec, bin));
+            // merge the group
+            outputManifests.add(createManifest(specId, bin));
           }
-        });
+        }, IOException.class);
 
     return Iterables.concat(binResults);
   }
 
-  // NOTE: This assumes that any files that are added are in an in-memory manifest.
-  private String createManifest(PartitionSpec binSpec, List<ManifestReader> bin) {
-    List<String> key = cacheKey(bin);
+  private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws IOException {
     // if this merge was already rewritten, use the existing file.
-    // if the new files are in this merge, the key is based on the number of new files so files
-    // added after the last merge will cause a cache miss.
-    if (mergeManifests.containsKey(key)) {
-      return mergeManifests.get(key);
+    // if the new files are in this merge, then the ManifestFile for the new files has changed and
+    // will be a cache miss.
+    if (mergeManifests.containsKey(bin)) {
+      return mergeManifests.get(bin);
     }
 
     OutputFile out = manifestPath(manifestCount.getAndIncrement());
 
-    try (ManifestWriter writer = new ManifestWriter(binSpec, out, snapshotId())) {
+    ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+    try {
 
-      for (ManifestReader reader : bin) {
-        if (reader.file() != null) {
+      for (ManifestFile manifest : bin) {
+        try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
           for (ManifestEntry entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
@@ -502,72 +471,49 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
               if (entry.snapshotId() == snapshotId()) {
                 writer.add(entry);
               }
+            } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+              // adds from this snapshot are still adds, otherwise they should be existing
+              writer.add(entry);
             } else {
               // add all files from the old manifest as existing files
               writer.addExisting(entry);
             }
           }
-        } else {
-          // if the files are in an in-memory manifest, then they are new
-          writer.addEntries(reader.entries());
         }
       }
 
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+    } finally {
+      writer.close();
     }
 
-    // update the cache
-    mergeManifests.put(key, out.location());
+    ManifestFile manifest = writer.toManifestFile();
 
-    return out.location();
-  }
+    // update the cache
+    mergeManifests.put(bin, manifest);
 
-  private ManifestReader newFilesAsManifest() {
-    long id = snapshotId();
-    ManifestEntry reused = new ManifestEntry(spec.partitionType());
-    return ManifestReader.inMemory(spec,
-        transform(newFiles, file -> {
-          reused.wrapAppend(id, file);
-          return reused;
-        }));
+    return manifest;
   }
 
-  private List<String> cacheKey(List<ManifestReader> group) {
-    List<String> key = Lists.newArrayList();
-
-    for (ManifestReader reader : group) {
-      if (reader.file() != null) {
-        key.add(reader.file().location());
-      } else {
-        // if the file is null, this is an in-memory reader
-        // use the size to avoid collisions if retries have added files
-        key.add("append-" + newFiles.size() + "-files");
-      }
+  private ManifestFile newFilesAsManifest() throws IOException {
+    if (hasNewFiles && newManifest != null) {
+      deleteFile(newManifest.path());
+      newManifest = null;
     }
 
-    return key;
-  }
+    if (newManifest == null) {
+      OutputFile out = manifestPath(manifestCount.getAndIncrement());
 
-  /**
-   * Helper method to group manifests by compatible partition spec.
-   * <p>
-   * When a match is found, this will replace the current spec for the group with the query spec.
-   * This is to produce manifests with the latest compatible spec.
-   *
-   * @param specs   a list of partition specs, corresponding to the groups of readers
-   * @param spec    spec to be matched to a group
-   * @return        group of readers files for this spec can be merged into
-   */
-  private static int findMatch(List<PartitionSpec> specs,
-                               PartitionSpec spec) {
-    // loop from last to first because later specs are most likely to match
-    for (int i = specs.size() - 1; i >= 0; i -= 1) {
-      if (specs.get(i).compatibleWith(spec)) {
-        return i;
+      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      try {
+        writer.addAll(newFiles);
+      } finally {
+        writer.close();
       }
+
+      this.newManifest = writer.toManifestFile();
+      this.hasNewFiles = false;
     }
 
-    return -1;
+    return newManifest;
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 404b440..3ebb725 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -52,7 +52,7 @@ public class OverwriteData extends MergingSnapshotUpdate implements OverwriteFil
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (validateAddedFiles) {
       PartitionSpec spec = writeSpec();
       Expression rowFilter = rowFilter();
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSummary.java b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
new file mode 100644
index 0000000..52a2b4a
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+class PartitionSummary {
+  private final PartitionFieldStats<?>[] fields;
+  private final Class<?>[] javaClasses;
+
+  PartitionSummary(PartitionSpec spec) {
+    this.javaClasses = spec.javaClasses();
+    this.fields = new PartitionFieldStats[javaClasses.length];
+    List<Types.NestedField> partitionFields = spec.partitionType().fields();
+    for (int i = 0; i < fields.length; i += 1) {
+      this.fields[i] = new PartitionFieldStats<>(partitionFields.get(i).type());
+    }
+  }
+
+  List<PartitionFieldSummary> summaries() {
+    return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary);
+  }
+
+  public void update(StructLike partitionKey) {
+    updateFields(partitionKey);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> void updateFields(StructLike key) {
+    for (int i = 0; i < javaClasses.length; i += 1) {
+      PartitionFieldStats<T> stats = (PartitionFieldStats<T>) fields[i];
+      Class<T> javaClass = (Class<T>) javaClasses[i];
+      stats.update(key.get(i, javaClass));
+    }
+  }
+
+  private static class PartitionFieldStats<T> {
+    private final Type type;
+    private final Comparator<T> comparator;
+
+    private boolean containsNull = false;
+    private T min = null;
+    private T max = null;
+
+    private PartitionFieldStats(Type type) {
+      this.type = type;
+      this.comparator = Comparators.forType(type.asPrimitiveType());
+    }
+
+    public PartitionFieldSummary toSummary() {
+      return new GenericPartitionFieldSummary(containsNull,
+          min != null ? Conversions.toByteBuffer(type, min) : null,
+          max != null ? Conversions.toByteBuffer(type, max) : null);
+    }
+
+    void update(T value) {
+      if (value == null) {
+        this.containsNull = true;
+      } else if (min == null) {
+        this.min = value;
+        this.max = value;
+      } else {
+        if (comparator.compare(value, min) < 0) {
+          this.min = value;
+        }
+        if (comparator.compare(max, value) < 0) {
+          this.max = value;
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
index b097674..8f473b3 100644
--- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
@@ -130,22 +130,22 @@ class RemoveSnapshots implements ExpireSnapshots {
 
     TableMetadata current = ops.refresh();
     Set<Long> currentIds = Sets.newHashSet();
-    Set<String> currentManifests = Sets.newHashSet();
+    Set<ManifestFile> currentManifests = Sets.newHashSet();
     for (Snapshot snapshot : current.snapshots()) {
       currentIds.add(snapshot.snapshotId());
       currentManifests.addAll(snapshot.manifests());
     }
 
-    Set<String> allManifests = Sets.newHashSet(currentManifests);
+    Set<ManifestFile> allManifests = Sets.newHashSet(currentManifests);
     Set<String> manifestsToDelete = Sets.newHashSet();
     for (Snapshot snapshot : base.snapshots()) {
       long snapshotId = snapshot.snapshotId();
       if (!currentIds.contains(snapshotId)) {
         // the snapshot was removed, find any manifests that are no longer needed
         LOG.info("Removing snapshot: {}", snapshot);
-        for (String manifest : snapshot.manifests()) {
+        for (ManifestFile manifest : snapshot.manifests()) {
           if (!currentManifests.contains(manifest)) {
-            manifestsToDelete.add(manifest);
+            manifestsToDelete.add(manifest.path());
             allManifests.add(manifest);
           }
         }
@@ -161,7 +161,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         ).run(manifest -> {
           // even if the manifest is still used, it may contain files that can be deleted
           // TODO: eliminate manifests with no deletes without scanning
-          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
             for (ManifestEntry entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -171,7 +171,7 @@ class RemoveSnapshots implements ExpireSnapshots {
               }
             }
           } catch (IOException e) {
-            throw new RuntimeIOException(e, "Failed to read manifest: " + manifest);
+            throw new RuntimeIOException(e, "Failed to read manifest file: " + manifest.path());
           }
     });
 
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 9d0db6c..4fb6aa8 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -42,7 +42,7 @@ public class ReplacePartitionsOperation extends MergingSnapshotUpdate implements
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (writeSpec().fields().size() <= 0) {
       // replace all data in an unpartitioned table
       deleteByRowFilter(Expressions.alwaysTrue());
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index b19ab38..7315786 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -90,11 +91,21 @@ public class ScanSummary {
       timeFilters.add(filter);
     }
 
+    public Builder after(String timestamp) {
+      Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+      return after(tsLiteral.value() / 1000);
+    }
+
     public Builder after(long timestampMillis) {
       addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", timestampMillis));
       return this;
     }
 
+    public Builder before(String timestamp) {
+      Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+      return before(tsLiteral.value() / 1000);
+    }
+
     public Builder before(long timestampMillis) {
       addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", timestampMillis));
       return this;
@@ -145,29 +156,45 @@ public class ScanSummary {
       removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
       Expression rowFilter = joinFilters(filters);
 
-      long minTimestamp = Long.MIN_VALUE;
-      long maxTimestamp = Long.MAX_VALUE;
+      Iterable<ManifestFile> manifests = table.currentSnapshot().manifests();
+
       boolean filterByTimestamp = !timeFilters.isEmpty();
+      Set<Long> snapshotsInTimeRange = Sets.newHashSet();
       if (filterByTimestamp) {
         Pair<Long, Long> range = timestampRange(timeFilters);
-        minTimestamp = range.first();
-        maxTimestamp = range.second();
+        long minTimestamp = range.first();
+        long maxTimestamp = range.second();
+
+        for (Map.Entry<Long, Long> entry : snapshotTimestamps.entrySet()) {
+          long snapshotId = entry.getKey();
+          long timestamp = entry.getValue();
+          if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
+            snapshotsInTimeRange.add(snapshotId);
+          }
+        }
+
+        // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
+        // that were added in the time range. files are added in new snapshots, so to get the new
+        // files, this only needs to scan new manifests in the set of snapshots that match the
+        // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
+        // the only manifests that need to be scanned are those with snapshotId() in the timestamp
+        // range, or those that don't have a snapshot ID.
+        manifests = Iterables.filter(manifests, manifest ->
+            manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
       }
 
-      try (CloseableIterable<ManifestEntry> entries =
-               new ManifestGroup(ops, table.currentSnapshot().manifests())
-                   .filterData(rowFilter)
-                   .ignoreDeleted()
-                   .select(SCAN_SUMMARY_COLUMNS)
-                   .entries()) {
+      try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, manifests)
+          .filterData(rowFilter)
+          .ignoreDeleted()
+          .select(SCAN_SUMMARY_COLUMNS)
+          .entries()) {
 
         PartitionSpec spec = table.spec();
         for (ManifestEntry entry : entries) {
           Long timestamp = snapshotTimestamps.get(entry.snapshotId());
 
           // if filtering, skip timestamps that are outside the range
-          if (filterByTimestamp &&
-              (timestamp == null || timestamp < minTimestamp || timestamp > maxTimestamp)) {
+          if (filterByTimestamp && !snapshotsInTimeRange.contains(entry.snapshotId())) {
             continue;
           }
 
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index cf04bec..a5ce08c 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,8 +22,12 @@ package com.netflix.iceberg;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.util.JsonUtil;
+import com.netflix.iceberg.util.Tasks;
+import com.netflix.iceberg.util.ThreadPools;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
@@ -34,19 +38,30 @@ public class SnapshotParser {
   private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
   private static final String TIMESTAMP_MS = "timestamp-ms";
   private static final String MANIFESTS = "manifests";
+  private static final String MANIFEST_LIST = "manifest-list";
 
-  static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
+  static void toJson(Snapshot snapshot, JsonGenerator generator)
+      throws IOException {
     generator.writeStartObject();
     generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
     if (snapshot.parentId() != null) {
       generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
     }
     generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
-    generator.writeArrayFieldStart(MANIFESTS);
-    for (String file : snapshot.manifests()) {
-      generator.writeString(file);
+
+    String manifestList = snapshot.manifestListLocation();
+    if (manifestList != null) {
+      // write just the location. manifests should not be embedded in JSON along with a list
+      generator.writeStringField(MANIFEST_LIST, manifestList);
+    } else {
+      // embed the manifest list in the JSON
+      generator.writeArrayFieldStart(MANIFESTS);
+      for (ManifestFile file : snapshot.manifests()) {
+        generator.writeString(file.path());
+      }
+      generator.writeEndArray();
     }
-    generator.writeEndArray();
+
     generator.writeEndObject();
   }
 
@@ -73,9 +88,19 @@ public class SnapshotParser {
       parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
     }
     long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
-    List<String> manifests = JsonUtil.getStringList(MANIFESTS, node);
 
-    return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+    if (node.has(MANIFEST_LIST)) {
+      // the manifest list is stored in a manifest list file
+      String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+
+    } else {
+      // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
+      // loaded lazily, if it is needed
+      List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
+          location -> new GenericManifestFile(ops.newInputFile(location), 0));
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+    }
   }
 
   public static Snapshot fromJson(TableOperations ops, String json) {
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 8fe0d81..54c0483 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -19,13 +19,19 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Exceptions;
 import com.netflix.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -39,10 +45,28 @@ import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
 import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
+import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 
 abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   private static final Logger LOG = LoggerFactory.getLogger(SnapshotUpdate.class);
-  static final Set<String> EMPTY_SET = Sets.newHashSet();
+  static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
+
+  /**
+   * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
+   */
+  private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<ManifestFile, ManifestFile>() {
+        @Override
+        public ManifestFile load(ManifestFile file) {
+          if (file.snapshotId() != null) {
+            return file;
+          }
+          return addMetadata(ops, file);
+        }
+      });
 
   private final TableOperations ops;
   private final String commitUUID = UUID.randomUUID().toString();
@@ -60,7 +84,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
    * @param base the base table metadata to apply changes to
    * @return a manifest list for the new snapshot.
    */
-  protected abstract List<String> apply(TableMetadata base);
+  protected abstract List<ManifestFile> apply(TableMetadata base);
 
   /**
    * Clean up any uncommitted manifests that were created.
@@ -72,16 +96,48 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
    *
    * @param committed a set of manifest paths that were actually committed
    */
-  protected abstract void cleanUncommitted(Set<String> committed);
+  protected abstract void cleanUncommitted(Set<ManifestFile> committed);
 
   @Override
   public Snapshot apply() {
     this.base = ops.refresh();
-    List<String> manifests = apply(base);
-    Long currentSnapshotId = base.currentSnapshot() != null ?
+    Long parentSnapshotId = base.currentSnapshot() != null ?
         base.currentSnapshot().snapshotId() : null;
-    return new BaseSnapshot(ops,
-        snapshotId(), currentSnapshotId, System.currentTimeMillis(), manifests);
+
+    List<ManifestFile> manifests = apply(base);
+
+    if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+      OutputFile manifestList = manifestListPath();
+
+      try (ManifestListWriter writer = new ManifestListWriter(
+          manifestListPath(), snapshotId(), parentSnapshotId)) {
+        ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
+
+        Tasks.range(manifestFiles.length)
+            .stopOnFailure().throwFailureWhenFinished()
+            .retry(4).exponentialBackoff(
+                base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+                base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+                base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+                2.0 /* exponential */ )
+            .executeWith(getWorkerPool())
+            .run(index ->
+                manifestFiles[index] = manifestsWithMetadata.getUnchecked(manifests.get(index)));
+
+        writer.addAll(Arrays.asList(manifestFiles));
+
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to write manifest list file");
+      }
+
+      return new BaseSnapshot(ops,
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+          ops.newInputFile(manifestList.location()));
+
+    } else {
+      return new BaseSnapshot(ops,
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+    }
   }
 
   @Override
@@ -123,7 +179,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
       }
 
     } catch (RuntimeException e) {
-      LOG.info("Failed to load committed table metadata, skipping manifest clean-up");
+      LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
     }
   }
 
@@ -135,6 +191,11 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
     ops.deleteFile(path);
   }
 
+  protected OutputFile manifestListPath() {
+    return ops.newMetadataFile(FileFormat.AVRO.addExtension(
+        String.format("snap-%d-%s", snapshotId(), commitUUID)));
+  }
+
   protected OutputFile manifestPath(int i) {
     return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
   }
@@ -145,4 +206,52 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
     }
     return snapshotId;
   }
+
+  private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
+    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+      PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
+      int addedFiles = 0;
+      int existingFiles = 0;
+      int deletedFiles = 0;
+
+      Long snapshotId = null;
+      long maxSnapshotId = Long.MIN_VALUE;
+      for (ManifestEntry entry : reader.entries()) {
+        if (entry.snapshotId() > maxSnapshotId) {
+          maxSnapshotId = entry.snapshotId();
+        }
+
+        switch (entry.status()) {
+          case ADDED:
+            addedFiles += 1;
+            if (snapshotId == null) {
+              snapshotId = entry.snapshotId();
+            }
+            break;
+          case EXISTING:
+            existingFiles += 1;
+            break;
+          case DELETED:
+            deletedFiles += 1;
+            if (snapshotId == null) {
+              snapshotId = entry.snapshotId();
+            }
+            break;
+        }
+
+        stats.update(entry.file().partition());
+      }
+
+      if (snapshotId == null) {
+        // if no files were added or deleted, use the largest snapshot ID in the manifest
+        snapshotId = maxSnapshotId;
+      }
+
+      return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
+          snapshotId, addedFiles, existingFiles, deletedFiles, stats.summaries());
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 05c3392..c949f13 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -221,6 +221,14 @@ public class TableMetadata {
     return properties;
   }
 
+  public boolean propertyAsBoolean(String property, boolean defaultValue) {
+    String value = properties.get(property);
+    if (value != null) {
+      return Boolean.parseBoolean(properties.get(property));
+    }
+    return defaultValue;
+  }
+
   public int propertyAsInt(String property, int defaultValue) {
     String value = properties.get(property);
     if (value != null) {
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 6ca09a5..e522f84 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -66,4 +66,7 @@ public class TableProperties {
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
+
+  public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
+  public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
 }
diff --git a/core/src/main/java/com/netflix/iceberg/avro/Avro.java b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
index d58bfbd..b08b5ff 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/Avro.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
@@ -22,7 +22,6 @@ package com.netflix.iceberg.avro;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.SchemaParser;
-import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import org.apache.avro.Conversions;
@@ -118,6 +117,11 @@ public class Avro {
       return this;
     }
 
+    public WriteBuilder meta(Map<String, String> properties) {
+      metadata.putAll(properties);
+      return this;
+    }
+
     private CodecFactory codec() {
       String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
       try {
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
index 9edea0d..2cf23ce 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
@@ -21,6 +21,7 @@ package com.netflix.iceberg.hadoop;
 
 import com.netflix.iceberg.exceptions.AlreadyExistsException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.io.PositionOutputStream;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,11 @@ public class HadoopOutputFile implements OutputFile {
   }
 
   @Override
+  public InputFile toInputFile() {
+    return HadoopInputFile.fromPath(path, conf);
+  }
+
+  @Override
   public String toString() {
     return location();
   }
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
new file mode 100644
index 0000000..27a01fc
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.netflix.iceberg.Files.localInput;
+
+class LocalTableOperations implements TableOperations {
+  private final TemporaryFolder temp;
+
+  LocalTableOperations(TemporaryFolder temp) {
+    this.temp = temp;
+  }
+
+  @Override
+  public TableMetadata current() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public TableMetadata refresh() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public void commit(TableMetadata base, TableMetadata metadata) {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return localInput(path);
+  }
+
+  @Override
+  public OutputFile newMetadataFile(String filename) {
+    try {
+      File metadataFile = temp.newFile(filename);
+      metadataFile.delete();
+      metadataFile.deleteOnExit();
+      return Files.localOutput(metadataFile);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    new File(path).delete();
+  }
+
+  @Override
+  public long newSnapshotId() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+}
diff --git a/core/src/test/java/com/netflix/iceberg/TableTestBase.java b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
index cafb440..c723daa 100644
--- a/core/src/test/java/com/netflix/iceberg/TableTestBase.java
+++ b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
@@ -120,23 +120,23 @@ public class TableTestBase {
   }
 
   void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
-    List<String> oldManifests = old != null ? old.manifests() : ImmutableList.of();
+    List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
 
     // copy the manifests to a modifiable list and remove the existing manifests
-    List<String> newManifests = Lists.newArrayList(snap.manifests());
-    for (String oldManifest : oldManifests) {
+    List<ManifestFile> newManifests = Lists.newArrayList(snap.manifests());
+    for (ManifestFile oldManifest : oldManifests) {
       Assert.assertTrue("New snapshot should contain old manifests",
           newManifests.remove(oldManifest));
     }
 
     Assert.assertEquals("Should create 1 new manifest and reuse old manifests",
         1, newManifests.size());
-    String manifest = newManifests.get(0);
+    ManifestFile manifest = newManifests.get(0);
 
     long id = snap.snapshotId();
     Iterator<String> newPaths = paths(newFiles).iterator();
 
-    for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
+    for (ManifestEntry entry : ManifestReader.read(localInput(manifest.path())).entries()) {
       DataFile file = entry.file();
       Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString());
       Assert.assertEquals("File's snapshot ID should match", id, entry.snapshotId());
@@ -153,9 +153,15 @@ public class TableTestBase {
     return paths;
   }
 
+  static void validateManifest(ManifestFile manifest,
+                               Iterator<Long> ids,
+                               Iterator<DataFile> expectedFiles) {
+    validateManifest(manifest.path(), ids, expectedFiles);
+  }
+
   static void validateManifest(String manifest,
-                                Iterator<Long> ids,
-                                Iterator<DataFile> expectedFiles) {
+                               Iterator<Long> ids,
+                               Iterator<DataFile> expectedFiles) {
     for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
       DataFile file = entry.file();
       DataFile expected = expectedFiles.next();
@@ -168,6 +174,13 @@ public class TableTestBase {
     Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
   }
 
+  static void validateManifestEntries(ManifestFile manifest,
+                                      Iterator<Long> ids,
+                                      Iterator<DataFile> expectedFiles,
+                                      Iterator<ManifestEntry.Status> expectedStatuses) {
+    validateManifestEntries(manifest.path(), ids, expectedFiles, expectedStatuses);
+  }
+
   static void validateManifestEntries(String manifest,
                                       Iterator<Long> ids,
                                       Iterator<DataFile> expectedFiles,
@@ -199,7 +212,7 @@ public class TableTestBase {
     return Iterators.forArray(files);
   }
 
-  static Iterator<DataFile> files(String manifest) {
-    return ManifestReader.read(localInput(manifest)).iterator();
+  static Iterator<DataFile> files(ManifestFile manifest) {
+    return ManifestReader.read(localInput(manifest.path())).iterator();
   }
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
index 9cac989..4d9e174 100644
--- a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
@@ -54,7 +54,7 @@ public class TestFastAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have one existing manifest", 1, v2manifests.size());
 
     // prepare a new append
@@ -80,7 +80,7 @@ public class TestFastAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v3manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v3manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 2 existing manifests", 2, v3manifests.size());
 
     // prepare a new append
@@ -110,7 +110,7 @@ public class TestFastAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
 
     // commit from the stale table
@@ -137,7 +137,7 @@ public class TestFastAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
 
     append.commit();
@@ -147,7 +147,7 @@ public class TestFastAppend extends TableTestBase {
     // apply was called before the conflicting commit, but the commit was still consistent
     validateSnapshot(base.currentSnapshot(), committed.currentSnapshot(), FILE_D);
 
-    List<String> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
+    List<ManifestFile> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
     committedManifests.removeAll(base.currentSnapshot().manifests());
     Assert.assertEquals("Should reused manifest created by apply",
         pending.manifests().get(0), committedManifests.get(0));
@@ -161,13 +161,13 @@ public class TestFastAppend extends TableTestBase {
 
     AppendFiles append = table.newFastAppend().appendFile(FILE_B);
     Snapshot pending = append.apply();
-    String newManifest = pending.manifests().get(0);
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
 
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", append::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
   }
 
   @Test
@@ -178,15 +178,15 @@ public class TestFastAppend extends TableTestBase {
 
     AppendFiles append = table.newFastAppend().appendFile(FILE_B);
     Snapshot pending = append.apply();
-    String newManifest = pending.manifests().get(0);
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
 
     append.commit();
 
     TableMetadata metadata = readMetadata();
 
     validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
-    Assert.assertTrue("Should commit same new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should commit same new manifest", new File(newManifest.path()).exists());
     Assert.assertTrue("Should commit the same new manifest",
         metadata.currentSnapshot().manifests().contains(newManifest));
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index a1e28bc..6b78c63 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -67,7 +67,7 @@ public class TestMergeAppend extends TableTestBase {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_C)
@@ -76,7 +76,7 @@ public class TestMergeAppend extends TableTestBase {
 
     Assert.assertEquals("Should contain 1 merged manifest for second write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -103,7 +103,7 @@ public class TestMergeAppend extends TableTestBase {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.newDelete()
         .deleteFile(FILE_A)
@@ -113,7 +113,7 @@ public class TestMergeAppend extends TableTestBase {
     long deleteId = delete.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 filtered manifest for delete",
         1, delete.currentSnapshot().manifests().size());
-    String deleteManifest = delete.currentSnapshot().manifests().get(0);
+    ManifestFile deleteManifest = delete.currentSnapshot().manifests().get(0);
 
     validateManifestEntries(deleteManifest,
         ids(deleteId, baseId),
@@ -127,7 +127,7 @@ public class TestMergeAppend extends TableTestBase {
 
     Assert.assertEquals("Should contain 1 merged manifest for second write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -168,7 +168,7 @@ public class TestMergeAppend extends TableTestBase {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should have 3 unmerged manifests",
         3, base.currentSnapshot().manifests().size());
-    Set<String> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
+    Set<ManifestFile> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_D)
@@ -176,7 +176,7 @@ public class TestMergeAppend extends TableTestBase {
 
     Assert.assertEquals("Should contain 1 merged manifest after the 4th write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertFalse("Should not contain previous manifests", unmerged.contains(newManifest));
 
     long pendingId = pending.snapshotId();
@@ -204,7 +204,7 @@ public class TestMergeAppend extends TableTestBase {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_C)
@@ -213,7 +213,7 @@ public class TestMergeAppend extends TableTestBase {
 
     Assert.assertEquals("Should contain 2 unmerged manifests after second write",
         2, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -233,7 +233,7 @@ public class TestMergeAppend extends TableTestBase {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     // build the new spec using the table's schema, which uses fresh IDs
     PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -279,7 +279,7 @@ public class TestMergeAppend extends TableTestBase {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should contain 2 manifests",
         2, base.currentSnapshot().manifests().size());
-    String manifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile manifest = base.currentSnapshot().manifests().get(0);
 
     // build the new spec using the table's schema, which uses fresh IDs
     PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -319,7 +319,7 @@ public class TestMergeAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     long baseId = base.currentSnapshot().snapshotId();
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.ops().failCommits(5);
 
@@ -327,9 +327,9 @@ public class TestMergeAppend extends TableTestBase {
     Snapshot pending = append.apply();
 
     Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
 
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
     validateManifest(newManifest,
         ids(pending.snapshotId(), baseId),
         concat(files(FILE_B), files(initialManifest)));
@@ -337,7 +337,7 @@ public class TestMergeAppend extends TableTestBase {
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", append::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
   }
 
   @Test
@@ -351,7 +351,7 @@ public class TestMergeAppend extends TableTestBase {
 
     TableMetadata base = readMetadata();
     long baseId = base.currentSnapshot().snapshotId();
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.ops().failCommits(3);
 
@@ -359,9 +359,9 @@ public class TestMergeAppend extends TableTestBase {
     Snapshot pending = append.apply();
 
     Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
 
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
     validateManifest(newManifest,
         ids(pending.snapshotId(), baseId),
         concat(files(FILE_B), files(initialManifest)));
@@ -369,7 +369,7 @@ public class TestMergeAppend extends TableTestBase {
     append.commit();
 
     TableMetadata metadata = readMetadata();
-    Assert.assertTrue("Should reuse the new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should reuse the new manifest", new File(newManifest.path()).exists());
     Assert.assertEquals("Should commit the same new manifest during retry",
         Lists.newArrayList(newManifest), metadata.currentSnapshot().manifests());
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
index d667294..032b680 100644
--- a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
+++ b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
@@ -87,7 +87,7 @@ public class TestReplaceFiles extends TableTestBase {
     long baseSnapshotId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newRewrite()
         .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -127,7 +127,7 @@ public class TestReplaceFiles extends TableTestBase {
     long baseSnapshotId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newRewrite()
         .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -167,8 +167,8 @@ public class TestReplaceFiles extends TableTestBase {
     Snapshot pending = rewrite.apply();
 
     Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
-    String manifest1 = pending.manifests().get(0);
-    String manifest2 = pending.manifests().get(1);
+    ManifestFile manifest1 = pending.manifests().get(0);
+    ManifestFile manifest2 = pending.manifests().get(1);
 
     validateManifestEntries(manifest1,
         ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -178,8 +178,8 @@ public class TestReplaceFiles extends TableTestBase {
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", rewrite::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(manifest1).exists());
-    Assert.assertFalse("Should clean up new manifest", new File(manifest2).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists());
 
     // As commit failed all the manifests added with rewrite should be cleaned up
     Assert.assertEquals("Only 1 manifest should exist", 1, listMetadataFiles("avro").size());
@@ -197,8 +197,8 @@ public class TestReplaceFiles extends TableTestBase {
     Snapshot pending = rewrite.apply();
 
     Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
-    String manifest1 = pending.manifests().get(0);
-    String manifest2 = pending.manifests().get(1);
+    ManifestFile manifest1 = pending.manifests().get(0);
+    ManifestFile manifest2 = pending.manifests().get(1);
 
     validateManifestEntries(manifest1,
         ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -207,8 +207,8 @@ public class TestReplaceFiles extends TableTestBase {
 
     rewrite.commit();
 
-    Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1).exists());
-    Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2).exists());
+    Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2.path()).exists());
 
     TableMetadata metadata = readMetadata();
     Assert.assertTrue("Should commit the manifest for append",
diff --git a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
index 4c55c67..dbcc811 100644
--- a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
@@ -19,19 +19,72 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static com.netflix.iceberg.Files.localInput;
 
 public class TestSnapshotJson {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public TableOperations ops = new LocalTableOperations(temp);
+
   @Test
   public void testJsonConversion() {
-    Snapshot expected = new BaseSnapshot(null, System.currentTimeMillis(),
+    Snapshot expected = new BaseSnapshot(ops, System.currentTimeMillis(),
         "file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro");
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(null, json);
+    Snapshot snapshot = SnapshotParser.fromJson(ops, json);
+
+    Assert.assertEquals("Snapshot ID should match",
+        expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Files should match",
+        expected.manifests(), snapshot.manifests());
+  }
+
+  @Test
+  public void testJsonConversionWithManifestList() throws IOException {
+    long parentId = 1;
+    long id = 2;
+    List<ManifestFile> manifests = ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
+        new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
+
+    File manifestList = temp.newFile("manifests");
+    Assert.assertTrue(manifestList.delete());
+    manifestList.deleteOnExit();
+
+    try (ManifestListWriter writer = new ManifestListWriter(
+        Files.localOutput(manifestList), id, parentId)) {
+      writer.addAll(manifests);
+    }
+
+    Snapshot expected = new BaseSnapshot(
+        ops, id, parentId, System.currentTimeMillis(), localInput(manifestList));
+    Snapshot inMemory = new BaseSnapshot(
+        ops, id, parentId, expected.timestampMillis(), manifests);
+
+    Assert.assertEquals("Files should match in memory list",
+        inMemory.manifests(), expected.manifests());
+
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(ops, json);
 
     Assert.assertEquals("Snapshot ID should match",
         expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Timestamp should match",
+        expected.timestampMillis(), snapshot.timestampMillis());
+    Assert.assertEquals("Parent ID should match",
+        expected.parentId(), snapshot.parentId());
+    Assert.assertEquals("Manifest list should match",
+        expected.manifestListLocation(), snapshot.manifestListLocation());
     Assert.assertEquals("Files should match",
         expected.manifests(), snapshot.manifests());
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 21acdbd..0b04fac 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -29,7 +29,9 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.types.Types;
 import com.netflix.iceberg.util.JsonUtil;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Arrays;
@@ -37,6 +39,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static com.netflix.iceberg.Files.localInput;
 import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
 import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
 import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
@@ -48,6 +51,11 @@ import static com.netflix.iceberg.TableMetadataParser.SCHEMA;
 import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
 
 public class TestTableMetadataJson {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public TableOperations ops = new LocalTableOperations(temp);
+
   @Test
   public void testJsonConversion() throws Exception {
     Schema schema = new Schema(
@@ -60,23 +68,25 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
         .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
         .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
         .build();
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
 
     String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     Assert.assertEquals("Table location should match",
@@ -120,14 +130,16 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
@@ -139,7 +151,7 @@ public class TestTableMetadataJson {
         new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()));
 
     String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     List<SnapshotLogEntry> expectedSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
@@ -163,18 +175,20 @@ public class TestTableMetadataJson {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
 
     String asJson = toJsonWithoutSpecList(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     Assert.assertEquals("Table location should match",
diff --git a/core/src/test/java/com/netflix/iceberg/TestTransaction.java b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
index 317ad5c..bdc3ddd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTransaction.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
@@ -274,7 +274,7 @@ public class TestTransaction extends TableTestBase {
         .appendFile(FILE_B)
         .commit();
 
-    Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+    Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -313,7 +313,7 @@ public class TestTransaction extends TableTestBase {
         .appendFile(FILE_B)
         .commit();
 
-    Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+    Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -327,13 +327,13 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> expectedManifests = Sets.newHashSet();
+    Set<ManifestFile> expectedManifests = Sets.newHashSet();
     expectedManifests.addAll(appendManifests);
     expectedManifests.addAll(conflictAppendManifests);
 
@@ -370,7 +370,7 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertEquals("Append should create one manifest",
         1, t.table().currentSnapshot().manifests().size());
-    String appendManifest = t.table().currentSnapshot().manifests().get(0);
+    ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -384,13 +384,13 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> previousManifests = Sets.newHashSet();
+    Set<ManifestFile> previousManifests = Sets.newHashSet();
     previousManifests.add(appendManifest);
     previousManifests.addAll(conflictAppendManifests);
 
@@ -399,7 +399,7 @@ public class TestTransaction extends TableTestBase {
     Assert.assertFalse("Should merge both commit manifests into a new manifest",
         previousManifests.contains(table.currentSnapshot().manifests().get(0)));
 
-    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
   }
 
   @Test
@@ -427,7 +427,7 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertEquals("Append should create one manifest",
         1, t.table().currentSnapshot().manifests().size());
-    String appendManifest = t.table().currentSnapshot().manifests().get(0);
+    ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -441,13 +441,13 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> previousManifests = Sets.newHashSet();
+    Set<ManifestFile> previousManifests = Sets.newHashSet();
     previousManifests.add(appendManifest);
     previousManifests.addAll(conflictAppendManifests);
 
@@ -456,6 +456,6 @@ public class TestTransaction extends TableTestBase {
     Assert.assertFalse("Should merge both commit manifests into a new manifest",
         previousManifests.contains(table.currentSnapshot().manifests().get(0)));
 
-    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
   }
 }