You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/06 01:20:08 UTC

[GitHub] rdblue closed pull request #21: Add manifest listing files

rdblue closed pull request #21: Add manifest listing files
URL: https://github.com/apache/incubator-iceberg/pull/21
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index b227199..f739751 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -75,6 +75,11 @@ public String location() {
       return file.toString();
     }
 
+    @Override
+    public InputFile toInputFile() {
+      return localInput(file);
+    }
+
     @Override
     public String toString() {
       return location();
diff --git a/api/src/main/java/com/netflix/iceberg/ManifestFile.java b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
new file mode 100644
index 0000000..b1d919b
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+/**
+ * Represents a manifest file that can be scanned to find data files in a table.
+ */
+public interface ManifestFile {
+  Schema SCHEMA = new Schema(
+      required(500, "manifest_path", Types.StringType.get()),
+      required(501, "manifest_length", Types.LongType.get()),
+      required(502, "partition_spec_id", Types.IntegerType.get()),
+      optional(503, "added_snapshot_id", Types.LongType.get()),
+      optional(504, "added_data_files_count", Types.IntegerType.get()),
+      optional(505, "existing_data_files_count", Types.IntegerType.get()),
+      optional(506, "deleted_data_files_count", Types.IntegerType.get()),
+      optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of(
+          required(509, "contains_null", Types.BooleanType.get()),
+          optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
+          optional(511, "upper_bound", Types.BinaryType.get())
+      ))));
+
+  static Schema schema() {
+    return SCHEMA;
+  }
+
+  /**
+   * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+   */
+  String path();
+
+  /**
+   * @return length of the manifest file
+   */
+  long length();
+
+  /**
+   * @return ID of the {@link PartitionSpec} used to write the manifest file
+   */
+  int partitionSpecId();
+
+  /**
+   * @return ID of the snapshot that added the manifest file to table metadata
+   */
+  Long snapshotId();
+
+  /**
+   * @return the number of data files with status ADDED in the manifest file
+   */
+  Integer addedFilesCount();
+
+  /**
+   * @return the number of data files with status EXISTING in the manifest file
+   */
+  Integer existingFilesCount();
+
+  /**
+   * @return the number of data files with status DELETED in the manifest file
+   */
+  Integer deletedFilesCount();
+
+  /**
+   * Returns a list of {@link PartitionFieldSummary partition field summaries}.
+   * <p>
+   * Each summary corresponds to a field in the manifest file's partition spec, by ordinal. For
+   * example, the partition spec [ ts_day=date(ts), type=identity(type) ] will have 2 summaries.
+   * The first summary is for the ts_day partition field and the second is for the type partition
+   * field.
+   *
+   * @return a list of partition field summaries, one for each field in the manifest's spec
+   */
+  List<PartitionFieldSummary> partitions();
+
+  /**
+   * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
+   * this method to make defensive copies.
+   *
+   * @return a copy of this manifest file
+   */
+  ManifestFile copy();
+
+  /**
+   * Summarizes the values of one partition field stored in a manifest file.
+   */
+  interface PartitionFieldSummary {
+    Types.StructType TYPE = ManifestFile.schema()
+        .findType("partitions")
+        .asListType()
+        .elementType()
+        .asStructType();
+
+    static Types.StructType getType() {
+      return TYPE;
+    }
+
+    /**
+     * @return true if at least one data file in the manifest has a null value for the field
+     */
+    boolean containsNull();
+
+    /**
+     * @return a ByteBuffer that contains a serialized bound lower than all values of the field
+     */
+    ByteBuffer lowerBound();
+
+    /**
+     * @return a ByteBuffer that contains a serialized bound higher than all values of the field
+     */
+    ByteBuffer upperBound();
+
+    /**
+     * Copies this {@link PartitionFieldSummary summary}. Readers can reuse instances; use this
+     * method to make defensive copies.
+     *
+     * @return a copy of this partition field summary
+     */
+    PartitionFieldSummary copy();
+  }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 7fc878b..89542dc 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -60,7 +60,7 @@
    *
    * @return a list of fully-qualified manifest locations
    */
-  List<String> manifests();
+  List<ManifestFile> manifests();
 
   /**
    * Return all files added to the table in this snapshot.
@@ -81,4 +81,11 @@
    * @return all files deleted from the table in this snapshot.
    */
   Iterable<DataFile> deletedFiles();
+
+  /**
+   * Return the location of this snapshot's manifest list, or null if it is not separate.
+   *
+   * @return the location of the manifest list for this Snapshot
+   */
+  String manifestListLocation();
 }
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
index 0106f35..5a83650 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
@@ -55,6 +55,10 @@ public int fieldId() {
     return fieldId;
   }
 
+  public int pos() {
+    return pos;
+  }
+
   public T get(StructLike struct) {
     return struct.get(pos, javaType());
   }
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
new file mode 100644
index 0000000..cac617d
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Types.StructType;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.expressions.Expressions.rewriteNot;
+
+/**
+ * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
+ * matching partitions.
+ * <p>
+ * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ * <p>
+ * Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
+ * data files that match the partition expression. Manifest files may be skipped if and only if the
+ * return value of {@code eval} is false.
+ */
+public class InclusiveManifestEvaluator {
+  private final StructType struct;
+  private final Expression expr;
+  private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
+
+  private ManifestEvalVisitor visitor() {
+    if (visitors == null) {
+      this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
+    }
+    return visitors.get();
+  }
+
+  public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
+    this.struct = spec.partitionType();
+    this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)));
+  }
+
+  /**
+   * Test whether the file may contain records that match the expression.
+   *
+   * @param manifest a manifest file
+   * @return false if the file cannot contain rows that match the expression, true otherwise.
+   */
+  public boolean eval(ManifestFile manifest) {
+    return visitor().eval(manifest);
+  }
+
+  private static final boolean ROWS_MIGHT_MATCH = true;
+  private static final boolean ROWS_CANNOT_MATCH = false;
+
+  private class ManifestEvalVisitor extends BoundExpressionVisitor<Boolean> {
+    private List<PartitionFieldSummary> stats = null;
+
+    private boolean eval(ManifestFile manifest) {
+      this.stats = manifest.partitions();
+      if (stats == null) {
+        return ROWS_MIGHT_MATCH;
+      }
+
+      return ExpressionVisitors.visit(expr, this);
+    }
+
+    @Override
+    public Boolean alwaysTrue() {
+      return ROWS_MIGHT_MATCH; // all rows match
+    }
+
+    @Override
+    public Boolean alwaysFalse() {
+      return ROWS_CANNOT_MATCH; // all rows fail
+    }
+
+    @Override
+    public Boolean not(Boolean result) {
+      return !result;
+    }
+
+    @Override
+    public Boolean and(Boolean leftResult, Boolean rightResult) {
+      return leftResult && rightResult;
+    }
+
+    @Override
+    public Boolean or(Boolean leftResult, Boolean rightResult) {
+      return leftResult || rightResult;
+    }
+
+    @Override
+    public <T> Boolean isNull(BoundReference<T> ref) {
+      // no need to check whether the field is required because binding evaluates that case
+      // if the column has no null values, the expression cannot match
+      if (!stats.get(ref.pos()).containsNull()) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notNull(BoundReference<T> ref) {
+      // containsNull encodes whether at least one partition value is null, lowerBound is null if
+      // all partition values are null.
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // all values are null
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp >= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+      if (lowerBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      if (upperBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp <= 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+      ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+      if (upperBound == null) {
+        return ROWS_CANNOT_MATCH; // values are all null
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+      int cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+      PartitionFieldSummary fieldStats = stats.get(ref.pos());
+      if (fieldStats.lowerBound() == null) {
+        return ROWS_CANNOT_MATCH; // values are all null and literal cannot contain null
+      }
+
+      T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
+      int cmp = lit.comparator().compare(lower, lit.value());
+      if (cmp > 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      T upper = Conversions.fromByteBuffer(ref.type(), fieldStats.upperBound());
+      cmp = lit.comparator().compare(upper, lit.value());
+      if (cmp < 0) {
+        return ROWS_CANNOT_MATCH;
+      }
+
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+      // because the bounds are not necessarily a min or max value, this cannot be answered using
+      // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean in(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+
+    @Override
+    public <T> Boolean notIn(BoundReference<T> ref, Literal<T> lit) {
+      return ROWS_MIGHT_MATCH;
+    }
+  }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
index 22ef41c..f4e5d4e 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
@@ -96,6 +96,7 @@ private Literals() {
     private final T value;
 
     BaseLiteral(T value) {
+      Preconditions.checkNotNull(value, "Literal values cannot be null");
       this.value = value;
     }
 
diff --git a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
index 9d75805..f0f48ee 100644
--- a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
+++ b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
@@ -58,4 +58,10 @@
    */
   String location();
 
+  /**
+   * Return an {@link InputFile} for the location of this output file.
+   *
+   * @return an input file for the location of this output file
+   */
+  InputFile toInputFile();
 }
diff --git a/api/src/main/java/com/netflix/iceberg/types/Comparators.java b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
index 9e2ce2d..6680f7d 100644
--- a/api/src/main/java/com/netflix/iceberg/types/Comparators.java
+++ b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
@@ -19,10 +19,41 @@
 
 package com.netflix.iceberg.types;
 
+import com.google.common.collect.ImmutableMap;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 
 public class Comparators {
+  private static final ImmutableMap<Type.PrimitiveType, Comparator<?>> COMPARATORS = ImmutableMap
+      .<Type.PrimitiveType, Comparator<?>>builder()
+      .put(Types.BooleanType.get(), Comparator.naturalOrder())
+      .put(Types.IntegerType.get(), Comparator.naturalOrder())
+      .put(Types.LongType.get(), Comparator.naturalOrder())
+      .put(Types.FloatType.get(), Comparator.naturalOrder())
+      .put(Types.DoubleType.get(), Comparator.naturalOrder())
+      .put(Types.DateType.get(), Comparator.naturalOrder())
+      .put(Types.TimeType.get(), Comparator.naturalOrder())
+      .put(Types.TimestampType.withZone(), Comparator.naturalOrder())
+      .put(Types.TimestampType.withoutZone(), Comparator.naturalOrder())
+      .put(Types.StringType.get(), Comparators.charSequences())
+      .put(Types.UUIDType.get(), Comparator.naturalOrder())
+      .put(Types.BinaryType.get(), Comparators.unsignedBytes())
+      .build();
+
+  @SuppressWarnings("unchecked")
+  public static <T> Comparator<T> forType(Type.PrimitiveType type) {
+    Comparator<?> cmp = COMPARATORS.get(type);
+    if (cmp != null) {
+      return (Comparator<T>) cmp;
+    } else if (type instanceof Types.FixedType) {
+      return (Comparator<T>) Comparators.unsignedBytes();
+    } else if (type instanceof Types.DecimalType) {
+      return (Comparator<T>) Comparator.naturalOrder();
+    }
+
+    throw new UnsupportedOperationException("Cannot determine comparator for type: " + type);
+  }
+
   public static Comparator<ByteBuffer> unsignedBytes() {
     return UnsignedByteBufComparator.INSTANCE;
   }
diff --git a/api/src/test/java/com/netflix/iceberg/TestHelpers.java b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
index 118e11b..ceb1eed 100644
--- a/api/src/test/java/com/netflix/iceberg/TestHelpers.java
+++ b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
@@ -176,6 +176,107 @@ private static void handleException(String message,
     }
   }
 
+  public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
+    private final boolean containsNull;
+    private final ByteBuffer lowerBound;
+    private final ByteBuffer upperBound;
+
+    public TestFieldSummary(boolean containsNull, ByteBuffer lowerBound, ByteBuffer upperBound) {
+      this.containsNull = containsNull;
+      this.lowerBound = lowerBound;
+      this.upperBound = upperBound;
+    }
+
+    @Override
+    public boolean containsNull() {
+      return containsNull;
+    }
+
+    @Override
+    public ByteBuffer lowerBound() {
+      return lowerBound;
+    }
+
+    @Override
+    public ByteBuffer upperBound() {
+      return upperBound;
+    }
+
+    @Override
+    public ManifestFile.PartitionFieldSummary copy() {
+      return this;
+    }
+  }
+
+  public static class TestManifestFile implements ManifestFile {
+    private final String path;
+    private final long length;
+    private final int specId;
+    private final Long snapshotId;
+    private final Integer addedFiles;
+    private final Integer existingFiles;
+    private final Integer deletedFiles;
+    private final List<PartitionFieldSummary> partitions;
+
+    public TestManifestFile(String path, long length, int specId, Long snapshotId,
+                            Integer addedFiles, Integer existingFiles, Integer deletedFiles,
+                            List<PartitionFieldSummary> partitions) {
+      this.path = path;
+      this.length = length;
+      this.specId = specId;
+      this.snapshotId = snapshotId;
+      this.addedFiles = addedFiles;
+      this.existingFiles = existingFiles;
+      this.deletedFiles = deletedFiles;
+      this.partitions = partitions;
+    }
+
+    @Override
+    public String path() {
+      return path;
+    }
+
+    @Override
+    public long length() {
+      return length;
+    }
+
+    @Override
+    public int partitionSpecId() {
+      return specId;
+    }
+
+    @Override
+    public Long snapshotId() {
+      return snapshotId;
+    }
+
+    @Override
+    public Integer addedFilesCount() {
+      return addedFiles;
+    }
+
+    @Override
+    public Integer existingFilesCount() {
+      return existingFiles;
+    }
+
+    @Override
+    public Integer deletedFilesCount() {
+      return deletedFiles;
+    }
+
+    @Override
+    public List<PartitionFieldSummary> partitions() {
+      return partitions;
+    }
+
+    @Override
+    public ManifestFile copy() {
+      return this;
+    }
+  }
+
   public static class TestDataFile implements DataFile {
     private final String path;
     private final StructLike partition;
diff --git a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
index aab66a3..1253f1e 100644
--- a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
+++ b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
@@ -34,6 +34,7 @@
   );
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testPartitionPath() {
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
         .hour("ts")
diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
new file mode 100644
index 0000000..f92f700
--- /dev/null
+++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.TestHelpers;
+import com.netflix.iceberg.exceptions.ValidationException;
+import com.netflix.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static com.netflix.iceberg.expressions.Expressions.and;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.isNull;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.not;
+import static com.netflix.iceberg.expressions.Expressions.notEqual;
+import static com.netflix.iceberg.expressions.Expressions.notNull;
+import static com.netflix.iceberg.expressions.Expressions.or;
+import static com.netflix.iceberg.types.Conversions.toByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestInclusiveManifestEvaluator {
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.IntegerType.get()),
+      optional(4, "all_nulls", Types.StringType.get()),
+      optional(5, "some_nulls", Types.StringType.get()),
+      optional(6, "no_nulls", Types.StringType.get())
+  );
+
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+      .withSpecId(0)
+      .identity("id")
+      .identity("all_nulls")
+      .identity("some_nulls")
+      .identity("no_nulls")
+      .build();
+
+  private static final ByteBuffer INT_MIN = toByteBuffer(Types.IntegerType.get(), 30);
+  private static final ByteBuffer INT_MAX = toByteBuffer(Types.IntegerType.get(), 79);
+
+  private static final ByteBuffer STRING_MIN = toByteBuffer(Types.StringType.get(), "a");
+  private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z");
+
+  private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile(
+      "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null);
+
+  private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro",
+      1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of(
+          new TestHelpers.TestFieldSummary(false, INT_MIN, INT_MAX),
+          new TestHelpers.TestFieldSummary(true, null, null),
+          new TestHelpers.TestFieldSummary(true, STRING_MIN, STRING_MAX),
+          new TestHelpers.TestFieldSummary(false, STRING_MIN, STRING_MAX)));
+
+  @Test
+  public void testAllNulls() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+    Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
+  }
+
+  @Test
+  public void testNoNulls() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+    Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+    Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
+  }
+
+  @Test
+  public void testMissingColumn() {
+    TestHelpers.assertThrows("Should complain about missing column in expression",
+        ValidationException.class, "Cannot find field 'missing'",
+        () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+  }
+
+  @Test
+  public void testMissingStats() {
+    Expression[] exprs = new Expression[] {
+        lessThan("id", 5), lessThanOrEqual("id", 30), equal("id", 70),
+        greaterThan("id", 78), greaterThanOrEqual("id", 90), notEqual("id", 101),
+        isNull("id"), notNull("id")
+    };
+
+    for (Expression expr : exprs) {
+      boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+      Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
+    }
+  }
+
+  @Test
+  public void testNot() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+    Assert.assertTrue("Should read: not(false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+    Assert.assertFalse("Should skip: not(true)", shouldRead);
+  }
+
+  @Test
+  public void testAnd() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+    Assert.assertFalse("Should skip: and(false, false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+    Assert.assertTrue("Should read: and(true, true)", shouldRead);
+  }
+
+  @Test
+  public void testOr() {
+    // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+    Assert.assertFalse("Should skip: or(false, false)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+    Assert.assertTrue("Should read: or(false, true)", shouldRead);
+  }
+
+  @Test
+  public void testIntegerLt() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerLtEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+    Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: many possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerGt() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerGtEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+    Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: one possible id", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(
+        SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: may possible ids", shouldRead);
+  }
+
+  @Test
+  public void testIntegerEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+    Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testIntegerNotEq() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+
+  @Test
+  public void testIntegerNotEqRewritten() {
+    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+    Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+    Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+    Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+    Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+    Assert.assertTrue("Should read: id above upper bound", shouldRead);
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 5409b9a..945ddbb 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -20,21 +20,25 @@
 package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.CloseableGroup;
+import com.netflix.iceberg.io.CloseableIterable;
+import com.netflix.iceberg.io.InputFile;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-class BaseSnapshot extends CloseableGroup implements Snapshot {
+class BaseSnapshot implements Snapshot {
   private final TableOperations ops;
   private final long snapshotId;
   private final Long parentId;
   private final long timestampMillis;
-  private final List<String> manifestFiles;
+  private final InputFile manifestList;
 
   // lazily initialized
+  private List<ManifestFile> manifests = null;
   private List<DataFile> adds = null;
   private List<DataFile> deletes = null;
 
@@ -44,19 +48,30 @@
   BaseSnapshot(TableOperations ops,
                long snapshotId,
                String... manifestFiles) {
-    this(ops, snapshotId, null, System.currentTimeMillis(), Arrays.asList(manifestFiles));
+    this(ops, snapshotId, null, System.currentTimeMillis(),
+        Lists.transform(Arrays.asList(manifestFiles),
+            path -> new GenericManifestFile(ops.newInputFile(path), 0)));
   }
 
   BaseSnapshot(TableOperations ops,
                long snapshotId,
                Long parentId,
                long timestampMillis,
-               List<String> manifestFiles) {
+               InputFile manifestList) {
     this.ops = ops;
     this.snapshotId = snapshotId;
     this.parentId = parentId;
     this.timestampMillis = timestampMillis;
-    this.manifestFiles = manifestFiles;
+    this.manifestList = manifestList;
+  }
+
+  BaseSnapshot(TableOperations ops,
+               long snapshotId,
+               Long parentId,
+               long timestampMillis,
+               List<ManifestFile> manifests) {
+    this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+    this.manifests = manifests;
   }
 
   @Override
@@ -75,8 +90,25 @@ public long timestampMillis() {
   }
 
   @Override
-  public List<String> manifests() {
-    return manifestFiles;
+  public List<ManifestFile> manifests() {
+    if (manifests == null) {
+      // if manifests isn't set, then the snapshotFile is set and should be read to get the list
+      try (CloseableIterable<ManifestFile> files = Avro.read(manifestList)
+          .rename("manifest_file", GenericManifestFile.class.getName())
+          .rename("partitions", GenericPartitionFieldSummary.class.getName())
+          .rename("r508", GenericPartitionFieldSummary.class.getName())
+          .project(ManifestFile.schema())
+          .reuseContainers(false)
+          .build()) {
+
+        this.manifests = Lists.newLinkedList(files);
+
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Cannot read snapshot file: %s", manifestList.location());
+      }
+    }
+
+    return manifests;
   }
 
   @Override
@@ -95,13 +127,18 @@ public long timestampMillis() {
     return deletes;
   }
 
+  @Override
+  public String manifestListLocation() {
+    return manifestList != null ? manifestList.location() : null;
+  }
+
   private void cacheChanges() {
     List<DataFile> adds = Lists.newArrayList();
     List<DataFile> deletes = Lists.newArrayList();
 
     // accumulate adds and deletes from all manifests.
     // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
-    for (String manifest : manifestFiles) {
+    for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
       try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
@@ -127,7 +164,7 @@ public String toString() {
     return Objects.toStringHelper(this)
         .add("id", snapshotId)
         .add("timestamp_ms", timestampMillis)
-        .add("manifests", manifestFiles)
+        .add("manifests", manifests)
         .toString();
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index e99889e..ad20780 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -21,6 +21,9 @@
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -30,6 +33,8 @@
 import com.netflix.iceberg.expressions.Binder;
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
+import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.ResidualEvaluator;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.TypeUtil;
@@ -141,6 +146,16 @@ public TableScan filter(Expression expr) {
     return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr));
   }
 
+  private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
+        @Override
+        public InclusiveManifestEvaluator load(Integer specId) {
+          PartitionSpec spec = ops.current().spec(specId);
+          return new InclusiveManifestEvaluator(spec, rowFilter);
+        }
+      });
+
   @Override
   public CloseableIterable<FileScanTask> planFiles() {
     Snapshot snapshot = snapshotId != null ?
@@ -155,11 +170,14 @@ public TableScan filter(Expression expr) {
       Listeners.notifyAll(
           new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema));
 
+      Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
+          manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+
       ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
       Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
-          snapshot.manifests(),
+          matchingManifests,
           manifest -> {
-            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
             toClose.add(reader);
             String schemaString = SchemaParser.toJson(reader.spec().schema());
             String specString = PartitionSpecParser.toJson(reader.spec());
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index de34545..278f059 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -35,7 +35,7 @@
 class FastAppend extends SnapshotUpdate implements AppendFiles {
   private final PartitionSpec spec;
   private final List<DataFile> newFiles = Lists.newArrayList();
-  private String newManifestLocation = null;
+  private ManifestFile newManifest = null;
   private boolean hasNewFiles = false;
 
   FastAppend(TableOperations ops) {
@@ -51,11 +51,15 @@ public FastAppend appendFile(DataFile file) {
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
-    String location = writeManifest();
+  public List<ManifestFile> apply(TableMetadata base) {
+    List<ManifestFile> newManifests = Lists.newArrayList();
+
+    try {
+      newManifests.add(writeManifest());
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to write manifest");
+    }
 
-    List<String> newManifests = Lists.newArrayList();
-    newManifests.add(location);
     if (base.currentSnapshot() != null) {
       newManifests.addAll(base.currentSnapshot().manifests());
     }
@@ -64,33 +68,32 @@ public FastAppend appendFile(DataFile file) {
   }
 
   @Override
-  protected void cleanUncommitted(Set<String> committed) {
-    if (!committed.contains(newManifestLocation)) {
-      deleteFile(newManifestLocation);
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
+    if (!committed.contains(newManifest)) {
+      deleteFile(newManifest.path());
     }
   }
 
-  private String writeManifest() {
-    if (hasNewFiles && newManifestLocation != null) {
-      deleteFile(newManifestLocation);
-      hasNewFiles = false;
-      newManifestLocation = null;
+  private ManifestFile writeManifest() throws IOException {
+    if (hasNewFiles && newManifest != null) {
+      deleteFile(newManifest.path());
+      newManifest = null;
     }
 
-    if (newManifestLocation == null) {
+    if (newManifest == null) {
       OutputFile out = manifestPath(0);
 
-      try (ManifestWriter writer = new ManifestWriter(spec, out, snapshotId())) {
-
+      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      try {
         writer.addAll(newFiles);
-
-      } catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+      } finally {
+        writer.close();
       }
 
-      this.newManifestLocation = out.location();
+      this.newManifest = writer.toManifestFile();
+      hasNewFiles = false;
     }
 
-    return newManifestLocation;
+    return newManifest;
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/FileHistory.java b/core/src/main/java/com/netflix/iceberg/FileHistory.java
index 3ed8006..60146b0 100644
--- a/core/src/main/java/com/netflix/iceberg/FileHistory.java
+++ b/core/src/main/java/com/netflix/iceberg/FileHistory.java
@@ -32,6 +32,9 @@
 import java.util.List;
 import java.util.Set;
 
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
+
 public class FileHistory {
   private static final List<String> HISTORY_COLUMNS = ImmutableList.of("file_path");
 
@@ -91,12 +94,17 @@ public Builder before(long timestampMillis) {
         snapshots = Iterables.filter(snapshots, snap -> snap.timestampMillis() <= endTime);
       }
 
+      // only use manifests that were added in the matching snapshots
+      Set<Long> matchingIds = Sets.newHashSet(transform(snapshots, snap -> snap.snapshotId()));
+      Iterable<ManifestFile> manifests = Iterables.filter(
+          concat(transform(snapshots, Snapshot::manifests)),
+          manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));
+
       // a manifest group will only read each manifest once
-      ManifestGroup manifests = new ManifestGroup(((HasTableOperations) table).operations(),
-          Iterables.concat(Iterables.transform(snapshots, Snapshot::manifests)));
+      ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations(), manifests);
 
       List<ManifestEntry> results = Lists.newArrayList();
-      try (CloseableIterable<ManifestEntry> entries = manifests.select(HISTORY_COLUMNS).entries()) {
+      try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
         // TODO: replace this with an IN predicate
         CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
         for (ManifestEntry entry : entries) {
diff --git a/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
new file mode 100644
index 0000000..628515b
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.Iterables.transform;
+
+public class GenericManifestFile
+    implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+  private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(
+      ManifestFile.schema(), "manifest_file");
+
+  private transient Schema avroSchema; // not final for Java serialization
+  private int[] fromProjectionPos;
+
+  // data fields
+  private InputFile file = null;
+  private String manifestPath = null;
+  private Long length = null;
+  private int specId = -1;
+  private Long snapshotId = null;
+  private Integer addedFilesCount = null;
+  private Integer existingFilesCount = null;
+  private Integer deletedFilesCount = null;
+  private List<PartitionFieldSummary> partitions = null;
+
+  /**
+   * Used by Avro reflection to instantiate this class when reading manifest files.
+   */
+  public GenericManifestFile(org.apache.avro.Schema avroSchema) {
+    this.avroSchema = avroSchema;
+
+    List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+        .asNestedType()
+        .asStructType()
+        .fields();
+    List<Types.NestedField> allFields = ManifestFile.schema().asStruct().fields();
+
+    this.fromProjectionPos = new int[fields.size()];
+    for (int i = 0; i < fromProjectionPos.length; i += 1) {
+      boolean found = false;
+      for (int j = 0; j < allFields.size(); j += 1) {
+        if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+          found = true;
+          fromProjectionPos[i] = j;
+        }
+      }
+
+      if (!found) {
+        throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+      }
+    }
+  }
+
+  GenericManifestFile(InputFile file, int specId) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.file = file;
+    this.manifestPath = file.location();
+    this.length = null; // lazily loaded from file
+    this.specId = specId;
+    this.snapshotId = null;
+    this.addedFilesCount = null;
+    this.existingFilesCount = null;
+    this.deletedFilesCount = null;
+    this.partitions = null;
+    this.fromProjectionPos = null;
+  }
+
+  public GenericManifestFile(String path, long length, int specId, long snapshotId,
+                             int addedFilesCount, int existingFilesCount, int deletedFilesCount,
+                             List<PartitionFieldSummary> partitions) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.manifestPath = path;
+    this.length = length;
+    this.specId = specId;
+    this.snapshotId = snapshotId;
+    this.addedFilesCount = addedFilesCount;
+    this.existingFilesCount = existingFilesCount;
+    this.deletedFilesCount = deletedFilesCount;
+    this.partitions = partitions;
+    this.fromProjectionPos = null;
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param toCopy a generic manifest file to copy.
+   */
+  private GenericManifestFile(GenericManifestFile toCopy) {
+    this.avroSchema = toCopy.avroSchema;
+    this.manifestPath = toCopy.manifestPath;
+    this.length = toCopy.length;
+    this.specId = toCopy.specId;
+    this.snapshotId = toCopy.snapshotId;
+    this.addedFilesCount = toCopy.addedFilesCount;
+    this.existingFilesCount = toCopy.existingFilesCount;
+    this.deletedFilesCount = toCopy.deletedFilesCount;
+    this.partitions = copyOf(transform(toCopy.partitions, PartitionFieldSummary::copy));
+    this.fromProjectionPos = toCopy.fromProjectionPos;
+  }
+
+  /**
+   * Constructor for Java serialization.
+   */
+  GenericManifestFile() {
+  }
+
+  @Override
+  public String path() {
+    return manifestPath;
+  }
+
+  public Long lazyLength() {
+    if (length == null) {
+      if (file != null) {
+        // this was created from an input file and length is lazily loaded
+        this.length = file.getLength();
+      } else {
+        // this was loaded from a file without projecting length, throw an exception
+        return null;
+      }
+    }
+    return length;
+  }
+
+  @Override
+  public long length() {
+    return lazyLength();
+  }
+
+  @Override
+  public int partitionSpecId() {
+    return specId;
+  }
+
+  @Override
+  public Long snapshotId() {
+    return snapshotId;
+  }
+
+  @Override
+  public Integer addedFilesCount() {
+    return addedFilesCount;
+  }
+
+  @Override
+  public Integer existingFilesCount() {
+    return existingFilesCount;
+  }
+
+  @Override
+  public Integer deletedFilesCount() {
+    return deletedFilesCount;
+  }
+
+  @Override
+  public List<PartitionFieldSummary> partitions() {
+    return partitions;
+  }
+
+  @Override
+  public int size() {
+    return ManifestFile.schema().columns().size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(get(pos));
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    set(i, v);
+  }
+
+  @Override
+  public Object get(int i) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        return manifestPath;
+      case 1:
+        return lazyLength();
+      case 2:
+        return specId;
+      case 3:
+        return snapshotId;
+      case 4:
+        return addedFilesCount;
+      case 5:
+        return existingFilesCount;
+      case 6:
+        return deletedFilesCount;
+      case 7:
+        return partitions;
+      default:
+        throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> void set(int i, T value) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        // always coerce to String for Serializable
+        this.manifestPath = value.toString();
+        return;
+      case 1:
+        this.length = (Long) value;
+        return;
+      case 2:
+        this.specId = (Integer) value;
+        return;
+      case 3:
+        this.snapshotId = (Long) value;
+        return;
+      case 4:
+        this.addedFilesCount = (Integer) value;
+        return;
+      case 5:
+        this.existingFilesCount = (Integer) value;
+        return;
+      case 6:
+        this.deletedFilesCount = (Integer) value;
+        return;
+      case 7:
+        this.partitions = (List<PartitionFieldSummary>) value;
+        return;
+      default:
+        // ignore the object, it must be from a newer version of the format
+    }
+  }
+
+  @Override
+  public ManifestFile copy() {
+    return new GenericManifestFile(this);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return avroSchema;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    GenericManifestFile that = (GenericManifestFile) other;
+    return Objects.equal(manifestPath, that.manifestPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(manifestPath);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("path", manifestPath)
+        .add("length", length)
+        .add("partition_spec_id", specId)
+        .add("added_snapshot_id", snapshotId)
+        .add("added_data_files_count", addedFilesCount)
+        .add("existing_data_files_count", existingFilesCount)
+        .add("deleted_data_files_count", deletedFilesCount)
+        .add("partitions", partitions)
+        .toString();
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
new file mode 100644
index 0000000..0c57cb3
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class GenericPartitionFieldSummary
+    implements PartitionFieldSummary, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+  private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(PartitionFieldSummary.getType());
+
+  private transient Schema avroSchema; // not final for Java serialization
+  private int[] fromProjectionPos;
+
+  // data fields
+  private boolean containsNull = false;
+  private ByteBuffer lowerBound = null;
+  private ByteBuffer upperBound = null;
+
+  /**
+   * Used by Avro reflection to instantiate this class when reading manifest files.
+   */
+  public GenericPartitionFieldSummary(Schema avroSchema) {
+    this.avroSchema = avroSchema;
+
+    List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+        .asNestedType()
+        .asStructType()
+        .fields();
+    List<Types.NestedField> allFields = PartitionFieldSummary.getType().fields();
+
+    this.fromProjectionPos = new int[fields.size()];
+    for (int i = 0; i < fromProjectionPos.length; i += 1) {
+      boolean found = false;
+      for (int j = 0; j < allFields.size(); j += 1) {
+        if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+          found = true;
+          fromProjectionPos[i] = j;
+        }
+      }
+
+      if (!found) {
+        throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+      }
+    }
+  }
+
+  public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound,
+                                      ByteBuffer upperBound) {
+    this.avroSchema = AVRO_SCHEMA;
+    this.containsNull = containsNull;
+    this.lowerBound = lowerBound;
+    this.upperBound = upperBound;
+    this.fromProjectionPos = null;
+  }
+
+  /**
+   * Copy constructor.
+   *
+   * @param toCopy a generic manifest file to copy.
+   */
+  private GenericPartitionFieldSummary(GenericPartitionFieldSummary toCopy) {
+    this.avroSchema = toCopy.avroSchema;
+    this.containsNull = toCopy.containsNull;
+    this.lowerBound = toCopy.lowerBound;
+    this.upperBound = toCopy.upperBound;
+    this.fromProjectionPos = toCopy.fromProjectionPos;
+  }
+
+  /**
+   * Constructor for Java serialization.
+   */
+  GenericPartitionFieldSummary() {
+  }
+
+  @Override
+  public boolean containsNull() {
+    return containsNull;
+  }
+
+  @Override
+  public ByteBuffer lowerBound() {
+    return lowerBound;
+  }
+
+  @Override
+  public ByteBuffer upperBound() {
+    return upperBound;
+  }
+
+  @Override
+  public int size() {
+    return PartitionFieldSummary.getType().fields().size();
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(get(pos));
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    set(i, v);
+  }
+
+  @Override
+  public Object get(int i) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        return containsNull;
+      case 1:
+        return lowerBound;
+      case 2:
+        return upperBound;
+      default:
+        throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> void set(int i, T value) {
+    int pos = i;
+    // if the schema was projected, map the incoming ordinal to the expected one
+    if (fromProjectionPos != null) {
+      pos = fromProjectionPos[i];
+    }
+    switch (pos) {
+      case 0:
+        this.containsNull = (Boolean) value;
+        return;
+      case 1:
+        this.lowerBound = (ByteBuffer) value;
+        return;
+      case 2:
+        this.upperBound = (ByteBuffer) value;
+        return;
+      default:
+        // ignore the object, it must be from a newer version of the format
+    }
+  }
+
+  @Override
+  public PartitionFieldSummary copy() {
+    return new GenericPartitionFieldSummary(this);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return avroSchema;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("contains_null", containsNull)
+        .add("lower_bound", lowerBound)
+        .add("upper_bound", upperBound)
+        .toString();
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index f343427..19d993f 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -37,18 +37,18 @@
   private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
 
   private final TableOperations ops;
-  private final Set<String> manifests;
+  private final Set<ManifestFile> manifests;
   private final Expression dataFilter;
   private final Expression fileFilter;
   private final boolean ignoreDeleted;
   private final List<String> columns;
 
-  ManifestGroup(TableOperations ops, Iterable<String> manifests) {
+  ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
     this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
         false, ImmutableList.of("*"));
   }
 
-  private ManifestGroup(TableOperations ops, Set<String> manifests,
+  private ManifestGroup(TableOperations ops, Set<ManifestFile> manifests,
                         Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
                         List<String> columns) {
     this.ops = ops;
@@ -94,10 +94,20 @@ public ManifestGroup select(String... columns) {
     Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
     List<Closeable> toClose = Lists.newArrayList();
 
+    Iterable<ManifestFile> matchingManifests = manifests;
+
+    if (ignoreDeleted) {
+      // remove any manifests that don't have any existing or added files. if either the added or
+      // existing files count is missing, the manifest must be scanned.
+      matchingManifests = Iterables.filter(manifests, manifest ->
+          manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
+              manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+    }
+
     Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
-        manifests,
+        matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
           toClose.add(reader);
           return Iterables.filter(
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
new file mode 100644
index 0000000..98cdbbf
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.io.OutputFile;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+class ManifestListWriter implements FileAppender<ManifestFile> {
+  private final FileAppender<ManifestFile> writer;
+
+  ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+    this.writer = newAppender(snapshotFile, ImmutableMap.of(
+        "snapshot-id", String.valueOf(snapshotId),
+        "parent-snapshot-id", String.valueOf(parentSnapshotId)));
+  }
+
+  @Override
+  public void add(ManifestFile file) {
+    writer.add(file);
+  }
+
+  @Override
+  public void addAll(Iterator<ManifestFile> values) {
+    writer.addAll(values);
+  }
+
+  @Override
+  public void addAll(Iterable<ManifestFile> values) {
+    writer.addAll(values);
+  }
+
+  @Override
+  public Metrics metrics() {
+    return writer.metrics();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+
+  private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
+    try {
+      return Avro.write(file)
+          .schema(ManifestFile.schema())
+          .named("manifest_file")
+          .meta(meta)
+          .build();
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index 28ba831..a59c100 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.FileAppender;
@@ -35,14 +36,27 @@
 class ManifestWriter implements FileAppender<DataFile> {
   private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
 
+  private final String location;
+  private final OutputFile file;
+  private final int specId;
   private final FileAppender<ManifestEntry> writer;
   private final long snapshotId;
-  private ManifestEntry reused = null;
+  private final ManifestEntry reused;
+  private final PartitionSummary stats;
+
+  private boolean closed = false;
+  private int addedFiles = 0;
+  private int existingFiles = 0;
+  private int deletedFiles = 0;
 
   ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
+    this.location = file.location();
+    this.file = file;
+    this.specId = spec.specId();
     this.writer = newAppender(FileFormat.AVRO, spec, file);
     this.snapshotId = snapshotId;
     this.reused = new ManifestEntry(spec.partitionType());
+    this.stats = new PartitionSummary(spec);
   }
 
   public void addExisting(Iterable<ManifestEntry> entries) {
@@ -54,25 +68,37 @@ public void addExisting(Iterable<ManifestEntry> entries) {
   }
 
   public void addExisting(ManifestEntry entry) {
-    writer.add(reused.wrapExisting(entry.snapshotId(), entry.file()));
+    add(reused.wrapExisting(entry.snapshotId(), entry.file()));
   }
 
   public void addExisting(long snapshotId, DataFile file) {
-    writer.add(reused.wrapExisting(snapshotId, file));
+    add(reused.wrapExisting(snapshotId, file));
   }
 
   public void delete(ManifestEntry entry) {
     // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
     // when this Snapshot has been removed or when there are no Snapshots older than this one.
-    writer.add(reused.wrapDelete(snapshotId, entry.file()));
+    add(reused.wrapDelete(snapshotId, entry.file()));
   }
 
   public void delete(DataFile file) {
-    writer.add(reused.wrapDelete(snapshotId, file));
+    add(reused.wrapDelete(snapshotId, file));
   }
 
-  public void add(ManifestEntry file) {
-    writer.add(file);
+  public void add(ManifestEntry entry) {
+    switch (entry.status()) {
+      case ADDED:
+        addedFiles += 1;
+        break;
+      case EXISTING:
+        existingFiles += 1;
+        break;
+      case DELETED:
+        deletedFiles += 1;
+        break;
+    }
+    stats.update(entry.file().partition());
+    writer.add(entry);
   }
 
   public void addEntries(Iterable<ManifestEntry> entries) {
@@ -85,7 +111,7 @@ public void addEntries(Iterable<ManifestEntry> entries) {
   public void add(DataFile file) {
     // TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
     // Eventually, this should check in case there are other DataFile implementations.
-    writer.add(reused.wrapAppend(snapshotId, file));
+    add(reused.wrapAppend(snapshotId, file));
   }
 
   @Override
@@ -93,8 +119,15 @@ public Metrics metrics() {
     return writer.metrics();
   }
 
+  public ManifestFile toManifestFile() {
+    Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
+    return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+        addedFiles, existingFiles, deletedFiles, stats.summaries());
+  }
+
   @Override
   public void close() throws IOException {
+    this.closed = true;
     writer.close();
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index f12ba04..8878d4c 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -25,7 +25,6 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.Closeables;
 import com.netflix.iceberg.ManifestEntry.Status;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
@@ -34,7 +33,6 @@
 import com.netflix.iceberg.expressions.Expressions;
 import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.StrictMetricsEvaluator;
-import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.BinPacking.ListPacker;
 import com.netflix.iceberg.util.CharSequenceWrapper;
@@ -42,15 +40,12 @@
 import com.netflix.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.collect.Iterables.filter;
@@ -64,7 +59,6 @@
 abstract class MergingSnapshotUpdate extends SnapshotUpdate {
   private final Logger LOG = LoggerFactory.getLogger(getClass());
 
-  private static final long SIZE_PER_FILE = 100; // assume each file will be ~100 bytes
   private static final Joiner COMMA = Joiner.on(",");
 
   protected static class DeleteException extends ValidationException {
@@ -94,14 +88,18 @@ public String partition() {
   private boolean failAnyDelete = false;
   private boolean failMissingDeletePaths = false;
 
+  // cache the new manifest once it is written
+  private ManifestFile newManifest = null;
+  private boolean hasNewFiles = false;
+
   // cache merge results to reuse when retrying
-  private final Map<List<String>, String> mergeManifests = Maps.newConcurrentMap();
+  private final Map<List<ManifestFile>, ManifestFile> mergeManifests = Maps.newConcurrentMap();
 
   // cache filtered manifests to avoid extra work when commits fail.
-  private final Map<String, ManifestReader> filteredManifests = Maps.newConcurrentMap();
+  private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
 
   // tracking where files were deleted to validate retries quickly
-  private final Map<String, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
+  private final Map<ManifestFile, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
       Maps.newConcurrentMap();
 
   private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -169,77 +167,70 @@ protected void delete(CharSequence path) {
    * Add a file to the new snapshot.
    */
   protected void add(DataFile file) {
+    hasNewFiles = true;
     newFiles.add(file);
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (filterUpdated) {
-      cleanUncommittedFilters(EMPTY_SET);
+      cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
       this.filterUpdated = false;
     }
 
     Snapshot current = base.currentSnapshot();
-    List<PartitionSpec> specs = Lists.newArrayList();
-    List<List<ManifestReader>> groups = Lists.newArrayList();
+    Map<Integer, List<ManifestFile>> groups = Maps.newTreeMap(Comparator.<Integer>reverseOrder());
 
     // use a common metrics evaluator for all manifests because it is bound to the table schema
     StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
         ops.current().schema(), deleteExpression);
 
     // add the current spec as the first group. files are added to the beginning.
-    if (newFiles.size() > 0) {
-      specs.add(spec);
-      groups.add(Lists.newArrayList());
-      groups.get(0).add(newFilesAsManifest());
-    }
-
-    ConcurrentLinkedQueue<ManifestReader> toClose = new ConcurrentLinkedQueue<>();
-    boolean threw = true;
     try {
+      if (newFiles.size() > 0) {
+        ManifestFile newManifest = newFilesAsManifest();
+        List<ManifestFile> manifestGroup = Lists.newArrayList();
+        manifestGroup.add(newManifest);
+        groups.put(newManifest.partitionSpecId(), manifestGroup);
+      }
+
       Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
 
       // group manifests by compatible partition specs to be merged
       if (current != null) {
-        List<String> manifests = current.manifests();
-        ManifestReader[] readers = new ManifestReader[manifests.size()];
+        List<ManifestFile> manifests = current.manifests();
+        ManifestFile[] filtered = new ManifestFile[manifests.size()];
         // open all of the manifest files in parallel, use index to avoid reordering
-        Tasks.range(readers.length)
+        Tasks.range(filtered.length)
             .stopOnFailure().throwFailureWhenFinished()
             .executeWith(getWorkerPool())
             .run(index -> {
-              ManifestReader manifest = filterManifest(
-                  deleteExpression, metricsEvaluator, ops.newInputFile(manifests.get(index)));
-              readers[index] = manifest;
-              toClose.add(manifest);
-            });
-
-        for (ManifestReader reader : readers) {
-          if (reader.file() != null) {
-            String location = reader.file().location();
-            Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(location);
-            if (manifestDeletes != null) {
-              deletedFiles.addAll(manifestDeletes);
-            }
+              ManifestFile manifest = filterManifest(
+                  deleteExpression, metricsEvaluator,
+                  manifests.get(index));
+              filtered[index] = manifest;
+            }, IOException.class);
+
+        for (ManifestFile manifest : filtered) {
+          Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+          if (manifestDeletes != null) {
+            deletedFiles.addAll(manifestDeletes);
           }
 
-          int index = findMatch(specs, reader.spec());
-          if (index < 0) {
-            // not found, add a new one
-            List<ManifestReader> newList = Lists.<ManifestReader>newArrayList(reader);
-            specs.add(reader.spec());
-            groups.add(newList);
+          List<ManifestFile> group = groups.get(manifest.partitionSpecId());
+          if (group != null) {
+            group.add(manifest);
           } else {
-            // replace the reader spec with the later one
-            specs.set(index, reader.spec());
-            groups.get(index).add(reader);
+            group = Lists.newArrayList();
+            group.add(manifest);
+            groups.put(manifest.partitionSpecId(), group);
           }
         }
       }
 
-      List<String> manifests = Lists.newArrayList();
-      for (int i = 0; i < specs.size(); i += 1) {
-        for (String manifest : mergeGroup(specs.get(i), groups.get(i))) {
+      List<ManifestFile> manifests = Lists.newArrayList();
+      for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
+        for (ManifestFile manifest : mergeGroup(entry.getKey(), entry.getValue())) {
           manifests.add(manifest);
         }
       }
@@ -250,52 +241,56 @@ protected void add(DataFile file) {
               path -> !deletedFiles.contains(path)),
               CharSequenceWrapper::get)));
 
-      threw = false;
-
       return manifests;
 
-    } finally {
-      for (ManifestReader reader : toClose) {
-        try {
-          Closeables.close(reader, threw);
-        } catch (IOException e) {
-          throw new RuntimeIOException(e);
-        }
-      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
     }
   }
 
-  private void cleanUncommittedMerges(Set<String> committed) {
-    List<Map.Entry<List<String>, String>> entries = Lists.newArrayList(mergeManifests.entrySet());
-    for (Map.Entry<List<String>, String> entry : entries) {
+  private void cleanUncommittedMerges(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+        Lists.newArrayList(mergeManifests.entrySet());
+
+    for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
       // delete any new merged manifests that aren't in the committed list
-      String merged = entry.getValue();
+      ManifestFile merged = entry.getValue();
       if (!committed.contains(merged)) {
-        deleteFile(merged);
+        deleteFile(merged.path());
         // remove the deleted file from the cache
         mergeManifests.remove(entry.getKey());
       }
     }
   }
 
-  private void cleanUncommittedFilters(Set<String> committed) {
-    List<Map.Entry<String, ManifestReader>> filterEntries = Lists.newArrayList(filteredManifests.entrySet());
-    for (Map.Entry<String, ManifestReader> entry : filterEntries) {
+  private void cleanUncommittedFilters(Set<ManifestFile> committed) {
+    // iterate over a copy of entries to avoid concurrent modification
+    List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
+        Lists.newArrayList(filteredManifests.entrySet());
+
+    for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
       // remove any new filtered manifests that aren't in the committed list
-      String manifest = entry.getKey();
-      ManifestReader filtered = entry.getValue();
-      if (filtered != null) {
-        String location = filtered.file().location();
-        if (!manifest.equals(location) && !committed.contains(location)) {
-          filteredManifests.remove(manifest);
-          deleteFile(location);
+      ManifestFile manifest = entry.getKey();
+      ManifestFile filtered = entry.getValue();
+      if (!committed.contains(filtered)) {
+        // only delete if the filtered copy was created
+        if (!manifest.equals(filtered)) {
+          deleteFile(filtered.path());
         }
+
+        // remove the entry from the cache
+        filteredManifests.remove(manifest);
       }
     }
   }
 
   @Override
-  protected void cleanUncommitted(Set<String> committed) {
+  protected void cleanUncommitted(Set<ManifestFile> committed) {
+    if (!committed.contains(newManifest)) {
+      deleteFile(newManifest.path());
+      this.newManifest = null;
+    }
     cleanUncommittedMerges(committed);
     cleanUncommittedFilters(committed);
   }
@@ -308,22 +303,20 @@ private boolean nothingToFilter() {
   /**
    * @return a ManifestReader that is a filtered version of the input manifest.
    */
-  private ManifestReader filterManifest(Expression deleteExpression,
+  private ManifestFile filterManifest(Expression deleteExpression,
                                         StrictMetricsEvaluator metricsEvaluator,
-                                        InputFile manifest) {
-    ManifestReader cached = filteredManifests.get(manifest.location());
+                                        ManifestFile manifest) throws IOException {
+    ManifestFile cached = filteredManifests.get(manifest);
     if (cached != null) {
       return cached;
     }
 
-    ManifestReader reader = ManifestReader.read(manifest);
-
     if (nothingToFilter()) {
-      filteredManifests.put(manifest.location(), reader);
-      return reader;
+      filteredManifests.put(manifest, manifest);
+      return manifest;
     }
 
-    try {
+    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
       Expression inclusiveExpr = Projections
           .inclusive(reader.spec())
           .project(deleteExpression);
@@ -344,42 +337,35 @@ private ManifestReader filterManifest(Expression deleteExpression,
       // manifest without copying data. if a manifest does have a file to remove, this will break
       // out of the loop and move on to filtering the manifest.
       boolean hasDeletedFiles = false;
-      Iterator<ManifestEntry> entries = reader.entries().iterator();
-      try {
-        while (entries.hasNext()) {
-          ManifestEntry entry = entries.next();
-          DataFile file = entry.file();
-          boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
-              dropPartitions.contains(partitionWrapper.set(file.partition())));
-          if (fileDelete || inclusive.eval(file.partition())) {
-            ValidationException.check(
-                fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
-                "Cannot delete file where some, but not all, rows match filter %s: %s",
-                this.deleteExpression, file.path());
-
-            hasDeletedFiles = true;
-            if (failAnyDelete) {
-              throw new DeleteException(writeSpec().partitionToPath(file.partition()));
-            }
-            break; // as soon as a deleted file is detected, stop scanning
+      for (ManifestEntry entry : reader.entries()) {
+        DataFile file = entry.file();
+        boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
+            dropPartitions.contains(partitionWrapper.set(file.partition())));
+        if (fileDelete || inclusive.eval(file.partition())) {
+          ValidationException.check(
+              fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+              "Cannot delete file where some, but not all, rows match filter %s: %s",
+              this.deleteExpression, file.path());
+
+          hasDeletedFiles = true;
+          if (failAnyDelete) {
+            throw new DeleteException(writeSpec().partitionToPath(file.partition()));
           }
-        }
-      } finally {
-        // the loop may have exited early. ensure the iterator is closed.
-        if (entries instanceof Closeable) {
-          ((Closeable) entries).close();
+          break; // as soon as a deleted file is detected, stop scanning
         }
       }
 
       if (!hasDeletedFiles) {
-        return reader;
+        filteredManifests.put(manifest, manifest);
+        return manifest;
       }
 
       // when this point is reached, there is at least one file that will be deleted in the
       // manifest. produce a copy of the manifest with all deleted files removed.
       Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
       OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
-      try (ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId())) {
+      ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+      try {
         for (ManifestEntry entry : reader.entries()) {
           DataFile file = entry.file();
           boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
@@ -396,7 +382,7 @@ private ManifestReader filterManifest(Expression deleteExpression,
               CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
               if (deletedPaths.contains(wrapper)) {
                 LOG.warn("Deleting a duplicate path from manifest {}: {}",
-                    manifest.location(), wrapper.get());
+                    manifest.path(), wrapper.get());
               }
               deletedPaths.add(wrapper);
 
@@ -405,96 +391,79 @@ private ManifestReader filterManifest(Expression deleteExpression,
             }
           }
         }
+      } finally {
+        writer.close();
       }
 
-      // close the reader now that it is no longer used and will not be returned
-      reader.close();
-
       // return the filtered manifest as a reader
-      ManifestReader filtered = ManifestReader.read(ops.newInputFile(filteredCopy.location()));
+      ManifestFile filtered = writer.toManifestFile();
 
       // update caches
-      filteredManifests.put(manifest.location(), filtered);
-      filteredManifestToDeletedFiles.put(filteredCopy.location(), deletedPaths);
+      filteredManifests.put(manifest, filtered);
+      filteredManifestToDeletedFiles.put(filtered, deletedPaths);
 
       return filtered;
-
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to filter manifest: %s", reader.file().location());
     }
   }
 
   @SuppressWarnings("unchecked")
-  private Iterable<String> mergeGroup(PartitionSpec groupSpec, List<ManifestReader> group) {
+  private Iterable<ManifestFile> mergeGroup(int specId, List<ManifestFile> group)
+      throws IOException {
     // use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
     // from the end so that the manifest that gets under-filled is the first one, which will be
     // merged the next time.
-    long newFilesSize = newFiles.size() * SIZE_PER_FILE;
-    ListPacker<ManifestReader> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
-    List<List<ManifestReader>> bins = packer.packEnd(group,
-        reader -> reader.file() != null ? reader.file().getLength() : newFilesSize);
+    ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+    List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
 
     // process bins in parallel, but put results in the order of the bins into an array to preserve
     // the order of manifests and contents. preserving the order helps avoid random deletes when
     // data files are eventually aged off.
-    List<String>[] binResults = (List<String>[]) Array.newInstance(List.class, bins.size());
+    List<ManifestFile>[] binResults = (List<ManifestFile>[])
+        Array.newInstance(List.class, bins.size());
     Tasks.range(bins.size())
         .stopOnFailure().throwFailureWhenFinished()
         .executeWith(getWorkerPool())
         .run(index -> {
-          List<ManifestReader> bin = bins.get(index);
-          List<String> outputManifests = Lists.newArrayList();
+          List<ManifestFile> bin = bins.get(index);
+          List<ManifestFile> outputManifests = Lists.newArrayList();
           binResults[index] = outputManifests;
 
-          if (bin.size() == 1 && bin.get(0).file() != null) {
+          if (bin.size() == 1) {
             // no need to rewrite
-            outputManifests.add(bin.get(0).file().location());
+            outputManifests.add(bin.get(0));
             return;
           }
 
-          boolean hasInMemoryManifest = false;
-          for (ManifestReader reader : bin) {
-            if (reader.file() == null) {
-              hasInMemoryManifest = true;
-            }
-          }
-
-          // if the bin has an in-memory manifest (the new data) then only merge it if the number of
+          // if the bin has a new manifest (the new data files) then only merge it if the number of
           // manifests is above the minimum count. this is applied only to bins with an in-memory
           // manifest so that large manifests don't prevent merging older groups.
-          if (hasInMemoryManifest && bin.size() < minManifestsCountToMerge) {
-            for (ManifestReader reader : bin) {
-              if (reader.file() != null) {
-                outputManifests.add(reader.file().location());
-              } else {
-                // write the in-memory manifest
-                outputManifests.add(createManifest(groupSpec, Collections.singletonList(reader)));
-              }
-            }
+          if (bin.contains(newManifest) && bin.size() < minManifestsCountToMerge) {
+            // not enough to merge, add all manifest files to the output list
+            outputManifests.addAll(bin);
           } else {
-            outputManifests.add(createManifest(groupSpec, bin));
+            // merge the group
+            outputManifests.add(createManifest(specId, bin));
           }
-        });
+        }, IOException.class);
 
     return Iterables.concat(binResults);
   }
 
-  // NOTE: This assumes that any files that are added are in an in-memory manifest.
-  private String createManifest(PartitionSpec binSpec, List<ManifestReader> bin) {
-    List<String> key = cacheKey(bin);
+  private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws IOException {
     // if this merge was already rewritten, use the existing file.
-    // if the new files are in this merge, the key is based on the number of new files so files
-    // added after the last merge will cause a cache miss.
-    if (mergeManifests.containsKey(key)) {
-      return mergeManifests.get(key);
+    // if the new files are in this merge, then the ManifestFile for the new files has changed and
+    // will be a cache miss.
+    if (mergeManifests.containsKey(bin)) {
+      return mergeManifests.get(bin);
     }
 
     OutputFile out = manifestPath(manifestCount.getAndIncrement());
 
-    try (ManifestWriter writer = new ManifestWriter(binSpec, out, snapshotId())) {
+    ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+    try {
 
-      for (ManifestReader reader : bin) {
-        if (reader.file() != null) {
+      for (ManifestFile manifest : bin) {
+        try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
           for (ManifestEntry entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
@@ -502,72 +471,49 @@ private String createManifest(PartitionSpec binSpec, List<ManifestReader> bin) {
               if (entry.snapshotId() == snapshotId()) {
                 writer.add(entry);
               }
+            } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+              // adds from this snapshot are still adds, otherwise they should be existing
+              writer.add(entry);
             } else {
               // add all files from the old manifest as existing files
               writer.addExisting(entry);
             }
           }
-        } else {
-          // if the files are in an in-memory manifest, then they are new
-          writer.addEntries(reader.entries());
         }
       }
 
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+    } finally {
+      writer.close();
     }
 
-    // update the cache
-    mergeManifests.put(key, out.location());
+    ManifestFile manifest = writer.toManifestFile();
 
-    return out.location();
-  }
+    // update the cache
+    mergeManifests.put(bin, manifest);
 
-  private ManifestReader newFilesAsManifest() {
-    long id = snapshotId();
-    ManifestEntry reused = new ManifestEntry(spec.partitionType());
-    return ManifestReader.inMemory(spec,
-        transform(newFiles, file -> {
-          reused.wrapAppend(id, file);
-          return reused;
-        }));
+    return manifest;
   }
 
-  private List<String> cacheKey(List<ManifestReader> group) {
-    List<String> key = Lists.newArrayList();
-
-    for (ManifestReader reader : group) {
-      if (reader.file() != null) {
-        key.add(reader.file().location());
-      } else {
-        // if the file is null, this is an in-memory reader
-        // use the size to avoid collisions if retries have added files
-        key.add("append-" + newFiles.size() + "-files");
-      }
+  private ManifestFile newFilesAsManifest() throws IOException {
+    if (hasNewFiles && newManifest != null) {
+      deleteFile(newManifest.path());
+      newManifest = null;
     }
 
-    return key;
-  }
+    if (newManifest == null) {
+      OutputFile out = manifestPath(manifestCount.getAndIncrement());
 
-  /**
-   * Helper method to group manifests by compatible partition spec.
-   * <p>
-   * When a match is found, this will replace the current spec for the group with the query spec.
-   * This is to produce manifests with the latest compatible spec.
-   *
-   * @param specs   a list of partition specs, corresponding to the groups of readers
-   * @param spec    spec to be matched to a group
-   * @return        group of readers files for this spec can be merged into
-   */
-  private static int findMatch(List<PartitionSpec> specs,
-                               PartitionSpec spec) {
-    // loop from last to first because later specs are most likely to match
-    for (int i = specs.size() - 1; i >= 0; i -= 1) {
-      if (specs.get(i).compatibleWith(spec)) {
-        return i;
+      ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+      try {
+        writer.addAll(newFiles);
+      } finally {
+        writer.close();
       }
+
+      this.newManifest = writer.toManifestFile();
+      this.hasNewFiles = false;
     }
 
-    return -1;
+    return newManifest;
   }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 404b440..3ebb725 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -52,7 +52,7 @@ public OverwriteFiles validateAddedFiles() {
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (validateAddedFiles) {
       PartitionSpec spec = writeSpec();
       Expression rowFilter = rowFilter();
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSummary.java b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
new file mode 100644
index 0000000..52a2b4a
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+class PartitionSummary {
+  private final PartitionFieldStats<?>[] fields;
+  private final Class<?>[] javaClasses;
+
+  PartitionSummary(PartitionSpec spec) {
+    this.javaClasses = spec.javaClasses();
+    this.fields = new PartitionFieldStats[javaClasses.length];
+    List<Types.NestedField> partitionFields = spec.partitionType().fields();
+    for (int i = 0; i < fields.length; i += 1) {
+      this.fields[i] = new PartitionFieldStats<>(partitionFields.get(i).type());
+    }
+  }
+
+  List<PartitionFieldSummary> summaries() {
+    return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary);
+  }
+
+  public void update(StructLike partitionKey) {
+    updateFields(partitionKey);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> void updateFields(StructLike key) {
+    for (int i = 0; i < javaClasses.length; i += 1) {
+      PartitionFieldStats<T> stats = (PartitionFieldStats<T>) fields[i];
+      Class<T> javaClass = (Class<T>) javaClasses[i];
+      stats.update(key.get(i, javaClass));
+    }
+  }
+
+  private static class PartitionFieldStats<T> {
+    private final Type type;
+    private final Comparator<T> comparator;
+
+    private boolean containsNull = false;
+    private T min = null;
+    private T max = null;
+
+    private PartitionFieldStats(Type type) {
+      this.type = type;
+      this.comparator = Comparators.forType(type.asPrimitiveType());
+    }
+
+    public PartitionFieldSummary toSummary() {
+      return new GenericPartitionFieldSummary(containsNull,
+          min != null ? Conversions.toByteBuffer(type, min) : null,
+          max != null ? Conversions.toByteBuffer(type, max) : null);
+    }
+
+    void update(T value) {
+      if (value == null) {
+        this.containsNull = true;
+      } else if (min == null) {
+        this.min = value;
+        this.max = value;
+      } else {
+        if (comparator.compare(value, min) < 0) {
+          this.min = value;
+        }
+        if (comparator.compare(max, value) < 0) {
+          this.max = value;
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
index b097674..8f473b3 100644
--- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
@@ -130,22 +130,22 @@ public void commit() {
 
     TableMetadata current = ops.refresh();
     Set<Long> currentIds = Sets.newHashSet();
-    Set<String> currentManifests = Sets.newHashSet();
+    Set<ManifestFile> currentManifests = Sets.newHashSet();
     for (Snapshot snapshot : current.snapshots()) {
       currentIds.add(snapshot.snapshotId());
       currentManifests.addAll(snapshot.manifests());
     }
 
-    Set<String> allManifests = Sets.newHashSet(currentManifests);
+    Set<ManifestFile> allManifests = Sets.newHashSet(currentManifests);
     Set<String> manifestsToDelete = Sets.newHashSet();
     for (Snapshot snapshot : base.snapshots()) {
       long snapshotId = snapshot.snapshotId();
       if (!currentIds.contains(snapshotId)) {
         // the snapshot was removed, find any manifests that are no longer needed
         LOG.info("Removing snapshot: {}", snapshot);
-        for (String manifest : snapshot.manifests()) {
+        for (ManifestFile manifest : snapshot.manifests()) {
           if (!currentManifests.contains(manifest)) {
-            manifestsToDelete.add(manifest);
+            manifestsToDelete.add(manifest.path());
             allManifests.add(manifest);
           }
         }
@@ -161,7 +161,7 @@ public void commit() {
         ).run(manifest -> {
           // even if the manifest is still used, it may contain files that can be deleted
           // TODO: eliminate manifests with no deletes without scanning
-          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
             for (ManifestEntry entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -171,7 +171,7 @@ public void commit() {
               }
             }
           } catch (IOException e) {
-            throw new RuntimeIOException(e, "Failed to read manifest: " + manifest);
+            throw new RuntimeIOException(e, "Failed to read manifest file: " + manifest.path());
           }
     });
 
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 9d0db6c..4fb6aa8 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -42,7 +42,7 @@ public ReplacePartitions validateAppendOnly() {
   }
 
   @Override
-  public List<String> apply(TableMetadata base) {
+  public List<ManifestFile> apply(TableMetadata base) {
     if (writeSpec().fields().size() <= 0) {
       // replace all data in an unpartitioned table
       deleteByRowFilter(Expressions.alwaysTrue());
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index b19ab38..7315786 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -90,11 +91,21 @@ private void addTimestampFilter(UnboundPredicate<Long> filter) {
       timeFilters.add(filter);
     }
 
+    public Builder after(String timestamp) {
+      Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+      return after(tsLiteral.value() / 1000);
+    }
+
     public Builder after(long timestampMillis) {
       addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", timestampMillis));
       return this;
     }
 
+    public Builder before(String timestamp) {
+      Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+      return before(tsLiteral.value() / 1000);
+    }
+
     public Builder before(long timestampMillis) {
       addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", timestampMillis));
       return this;
@@ -145,29 +156,45 @@ private void removeTimeFilters(List<Expression> expressions, Expression expressi
       removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
       Expression rowFilter = joinFilters(filters);
 
-      long minTimestamp = Long.MIN_VALUE;
-      long maxTimestamp = Long.MAX_VALUE;
+      Iterable<ManifestFile> manifests = table.currentSnapshot().manifests();
+
       boolean filterByTimestamp = !timeFilters.isEmpty();
+      Set<Long> snapshotsInTimeRange = Sets.newHashSet();
       if (filterByTimestamp) {
         Pair<Long, Long> range = timestampRange(timeFilters);
-        minTimestamp = range.first();
-        maxTimestamp = range.second();
+        long minTimestamp = range.first();
+        long maxTimestamp = range.second();
+
+        for (Map.Entry<Long, Long> entry : snapshotTimestamps.entrySet()) {
+          long snapshotId = entry.getKey();
+          long timestamp = entry.getValue();
+          if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
+            snapshotsInTimeRange.add(snapshotId);
+          }
+        }
+
+        // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
+        // that were added in the time range. files are added in new snapshots, so to get the new
+        // files, this only needs to scan new manifests in the set of snapshots that match the
+        // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
+        // the only manifests that need to be scanned are those with snapshotId() in the timestamp
+        // range, or those that don't have a snapshot ID.
+        manifests = Iterables.filter(manifests, manifest ->
+            manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
       }
 
-      try (CloseableIterable<ManifestEntry> entries =
-               new ManifestGroup(ops, table.currentSnapshot().manifests())
-                   .filterData(rowFilter)
-                   .ignoreDeleted()
-                   .select(SCAN_SUMMARY_COLUMNS)
-                   .entries()) {
+      try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, manifests)
+          .filterData(rowFilter)
+          .ignoreDeleted()
+          .select(SCAN_SUMMARY_COLUMNS)
+          .entries()) {
 
         PartitionSpec spec = table.spec();
         for (ManifestEntry entry : entries) {
           Long timestamp = snapshotTimestamps.get(entry.snapshotId());
 
           // if filtering, skip timestamps that are outside the range
-          if (filterByTimestamp &&
-              (timestamp == null || timestamp < minTimestamp || timestamp > maxTimestamp)) {
+          if (filterByTimestamp && !snapshotsInTimeRange.contains(entry.snapshotId())) {
             continue;
           }
 
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index cf04bec..a5ce08c 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,8 +22,12 @@
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.util.JsonUtil;
+import com.netflix.iceberg.util.Tasks;
+import com.netflix.iceberg.util.ThreadPools;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
@@ -34,19 +38,30 @@
   private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
   private static final String TIMESTAMP_MS = "timestamp-ms";
   private static final String MANIFESTS = "manifests";
+  private static final String MANIFEST_LIST = "manifest-list";
 
-  static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
+  static void toJson(Snapshot snapshot, JsonGenerator generator)
+      throws IOException {
     generator.writeStartObject();
     generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
     if (snapshot.parentId() != null) {
       generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
     }
     generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
-    generator.writeArrayFieldStart(MANIFESTS);
-    for (String file : snapshot.manifests()) {
-      generator.writeString(file);
+
+    String manifestList = snapshot.manifestListLocation();
+    if (manifestList != null) {
+      // write just the location. manifests should not be embedded in JSON along with a list
+      generator.writeStringField(MANIFEST_LIST, manifestList);
+    } else {
+      // embed the manifest list in the JSON
+      generator.writeArrayFieldStart(MANIFESTS);
+      for (ManifestFile file : snapshot.manifests()) {
+        generator.writeString(file.path());
+      }
+      generator.writeEndArray();
     }
-    generator.writeEndArray();
+
     generator.writeEndObject();
   }
 
@@ -73,9 +88,19 @@ static Snapshot fromJson(TableOperations ops, JsonNode node) {
       parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
     }
     long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
-    List<String> manifests = JsonUtil.getStringList(MANIFESTS, node);
 
-    return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+    if (node.has(MANIFEST_LIST)) {
+      // the manifest list is stored in a manifest list file
+      String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+
+    } else {
+      // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
+      // loaded lazily, if it is needed
+      List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
+          location -> new GenericManifestFile(ops.newInputFile(location), 0));
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+    }
   }
 
   public static Snapshot fromJson(TableOperations ops, String json) {
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 8fe0d81..54c0483 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -19,13 +19,19 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Exceptions;
 import com.netflix.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -39,10 +45,28 @@
 import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
 import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
+import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 
 abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   private static final Logger LOG = LoggerFactory.getLogger(SnapshotUpdate.class);
-  static final Set<String> EMPTY_SET = Sets.newHashSet();
+  static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
+
+  /**
+   * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
+   */
+  private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata = CacheBuilder
+      .newBuilder()
+      .build(new CacheLoader<ManifestFile, ManifestFile>() {
+        @Override
+        public ManifestFile load(ManifestFile file) {
+          if (file.snapshotId() != null) {
+            return file;
+          }
+          return addMetadata(ops, file);
+        }
+      });
 
   private final TableOperations ops;
   private final String commitUUID = UUID.randomUUID().toString();
@@ -60,7 +84,7 @@ protected SnapshotUpdate(TableOperations ops) {
    * @param base the base table metadata to apply changes to
    * @return a manifest list for the new snapshot.
    */
-  protected abstract List<String> apply(TableMetadata base);
+  protected abstract List<ManifestFile> apply(TableMetadata base);
 
   /**
    * Clean up any uncommitted manifests that were created.
@@ -72,16 +96,48 @@ protected SnapshotUpdate(TableOperations ops) {
    *
    * @param committed a set of manifest paths that were actually committed
    */
-  protected abstract void cleanUncommitted(Set<String> committed);
+  protected abstract void cleanUncommitted(Set<ManifestFile> committed);
 
   @Override
   public Snapshot apply() {
     this.base = ops.refresh();
-    List<String> manifests = apply(base);
-    Long currentSnapshotId = base.currentSnapshot() != null ?
+    Long parentSnapshotId = base.currentSnapshot() != null ?
         base.currentSnapshot().snapshotId() : null;
-    return new BaseSnapshot(ops,
-        snapshotId(), currentSnapshotId, System.currentTimeMillis(), manifests);
+
+    List<ManifestFile> manifests = apply(base);
+
+    if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+      OutputFile manifestList = manifestListPath();
+
+      try (ManifestListWriter writer = new ManifestListWriter(
+          manifestListPath(), snapshotId(), parentSnapshotId)) {
+        ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
+
+        Tasks.range(manifestFiles.length)
+            .stopOnFailure().throwFailureWhenFinished()
+            .retry(4).exponentialBackoff(
+                base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+                base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+                base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+                2.0 /* exponential */ )
+            .executeWith(getWorkerPool())
+            .run(index ->
+                manifestFiles[index] = manifestsWithMetadata.getUnchecked(manifests.get(index)));
+
+        writer.addAll(Arrays.asList(manifestFiles));
+
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to write manifest list file");
+      }
+
+      return new BaseSnapshot(ops,
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+          ops.newInputFile(manifestList.location()));
+
+    } else {
+      return new BaseSnapshot(ops,
+          snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+    }
   }
 
   @Override
@@ -123,7 +179,7 @@ public void commit() {
       }
 
     } catch (RuntimeException e) {
-      LOG.info("Failed to load committed table metadata, skipping manifest clean-up");
+      LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
     }
   }
 
@@ -135,6 +191,11 @@ protected void deleteFile(String path) {
     ops.deleteFile(path);
   }
 
+  protected OutputFile manifestListPath() {
+    return ops.newMetadataFile(FileFormat.AVRO.addExtension(
+        String.format("snap-%d-%s", snapshotId(), commitUUID)));
+  }
+
   protected OutputFile manifestPath(int i) {
     return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
   }
@@ -145,4 +206,52 @@ protected long snapshotId() {
     }
     return snapshotId;
   }
+
+  private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
+    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+      PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
+      int addedFiles = 0;
+      int existingFiles = 0;
+      int deletedFiles = 0;
+
+      Long snapshotId = null;
+      long maxSnapshotId = Long.MIN_VALUE;
+      for (ManifestEntry entry : reader.entries()) {
+        if (entry.snapshotId() > maxSnapshotId) {
+          maxSnapshotId = entry.snapshotId();
+        }
+
+        switch (entry.status()) {
+          case ADDED:
+            addedFiles += 1;
+            if (snapshotId == null) {
+              snapshotId = entry.snapshotId();
+            }
+            break;
+          case EXISTING:
+            existingFiles += 1;
+            break;
+          case DELETED:
+            deletedFiles += 1;
+            if (snapshotId == null) {
+              snapshotId = entry.snapshotId();
+            }
+            break;
+        }
+
+        stats.update(entry.file().partition());
+      }
+
+      if (snapshotId == null) {
+        // if no files were added or deleted, use the largest snapshot ID in the manifest
+        snapshotId = maxSnapshotId;
+      }
+
+      return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
+          snapshotId, addedFiles, existingFiles, deletedFiles, stats.summaries());
+
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 05c3392..c949f13 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -221,6 +221,14 @@ public String location() {
     return properties;
   }
 
+  public boolean propertyAsBoolean(String property, boolean defaultValue) {
+    String value = properties.get(property);
+    if (value != null) {
+      return Boolean.parseBoolean(properties.get(property));
+    }
+    return defaultValue;
+  }
+
   public int propertyAsInt(String property, int defaultValue) {
     String value = properties.get(property);
     if (value != null) {
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 6ca09a5..e522f84 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -66,4 +66,7 @@
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
+
+  public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
+  public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
 }
diff --git a/core/src/main/java/com/netflix/iceberg/avro/Avro.java b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
index d58bfbd..b08b5ff 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/Avro.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
@@ -22,7 +22,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.SchemaParser;
-import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import org.apache.avro.Conversions;
@@ -118,6 +117,11 @@ public WriteBuilder meta(String property, String value) {
       return this;
     }
 
+    public WriteBuilder meta(Map<String, String> properties) {
+      metadata.putAll(properties);
+      return this;
+    }
+
     private CodecFactory codec() {
       String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
       try {
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
index 9edea0d..2cf23ce 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
@@ -21,6 +21,7 @@
 
 import com.netflix.iceberg.exceptions.AlreadyExistsException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.io.PositionOutputStream;
 import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,11 @@ public String location() {
     return path.toString();
   }
 
+  @Override
+  public InputFile toInputFile() {
+    return HadoopInputFile.fromPath(path, conf);
+  }
+
   @Override
   public String toString() {
     return location();
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
new file mode 100644
index 0000000..27a01fc
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.netflix.iceberg.Files.localInput;
+
+class LocalTableOperations implements TableOperations {
+  private final TemporaryFolder temp;
+
+  LocalTableOperations(TemporaryFolder temp) {
+    this.temp = temp;
+  }
+
+  @Override
+  public TableMetadata current() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public TableMetadata refresh() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public void commit(TableMetadata base, TableMetadata metadata) {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return localInput(path);
+  }
+
+  @Override
+  public OutputFile newMetadataFile(String filename) {
+    try {
+      File metadataFile = temp.newFile(filename);
+      metadataFile.delete();
+      metadataFile.deleteOnExit();
+      return Files.localOutput(metadataFile);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    new File(path).delete();
+  }
+
+  @Override
+  public long newSnapshotId() {
+    throw new UnsupportedOperationException("Not implemented for tests");
+  }
+}
diff --git a/core/src/test/java/com/netflix/iceberg/TableTestBase.java b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
index cafb440..c723daa 100644
--- a/core/src/test/java/com/netflix/iceberg/TableTestBase.java
+++ b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
@@ -120,23 +120,23 @@ TableMetadata readMetadata() {
   }
 
   void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
-    List<String> oldManifests = old != null ? old.manifests() : ImmutableList.of();
+    List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
 
     // copy the manifests to a modifiable list and remove the existing manifests
-    List<String> newManifests = Lists.newArrayList(snap.manifests());
-    for (String oldManifest : oldManifests) {
+    List<ManifestFile> newManifests = Lists.newArrayList(snap.manifests());
+    for (ManifestFile oldManifest : oldManifests) {
       Assert.assertTrue("New snapshot should contain old manifests",
           newManifests.remove(oldManifest));
     }
 
     Assert.assertEquals("Should create 1 new manifest and reuse old manifests",
         1, newManifests.size());
-    String manifest = newManifests.get(0);
+    ManifestFile manifest = newManifests.get(0);
 
     long id = snap.snapshotId();
     Iterator<String> newPaths = paths(newFiles).iterator();
 
-    for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
+    for (ManifestEntry entry : ManifestReader.read(localInput(manifest.path())).entries()) {
       DataFile file = entry.file();
       Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString());
       Assert.assertEquals("File's snapshot ID should match", id, entry.snapshotId());
@@ -153,9 +153,15 @@ void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
     return paths;
   }
 
+  static void validateManifest(ManifestFile manifest,
+                               Iterator<Long> ids,
+                               Iterator<DataFile> expectedFiles) {
+    validateManifest(manifest.path(), ids, expectedFiles);
+  }
+
   static void validateManifest(String manifest,
-                                Iterator<Long> ids,
-                                Iterator<DataFile> expectedFiles) {
+                               Iterator<Long> ids,
+                               Iterator<DataFile> expectedFiles) {
     for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
       DataFile file = entry.file();
       DataFile expected = expectedFiles.next();
@@ -168,6 +174,13 @@ static void validateManifest(String manifest,
     Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
   }
 
+  static void validateManifestEntries(ManifestFile manifest,
+                                      Iterator<Long> ids,
+                                      Iterator<DataFile> expectedFiles,
+                                      Iterator<ManifestEntry.Status> expectedStatuses) {
+    validateManifestEntries(manifest.path(), ids, expectedFiles, expectedStatuses);
+  }
+
   static void validateManifestEntries(String manifest,
                                       Iterator<Long> ids,
                                       Iterator<DataFile> expectedFiles,
@@ -199,7 +212,7 @@ static void validateManifestEntries(String manifest,
     return Iterators.forArray(files);
   }
 
-  static Iterator<DataFile> files(String manifest) {
-    return ManifestReader.read(localInput(manifest)).iterator();
+  static Iterator<DataFile> files(ManifestFile manifest) {
+    return ManifestReader.read(localInput(manifest.path())).iterator();
   }
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
index 9cac989..4d9e174 100644
--- a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
@@ -54,7 +54,7 @@ public void testNonEmptyTableAppend() {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have one existing manifest", 1, v2manifests.size());
 
     // prepare a new append
@@ -80,7 +80,7 @@ public void testNoMerge() {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v3manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v3manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 2 existing manifests", 2, v3manifests.size());
 
     // prepare a new append
@@ -110,7 +110,7 @@ public void testRefreshBeforeApply() {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
 
     // commit from the stale table
@@ -137,7 +137,7 @@ public void testRefreshBeforeCommit() {
 
     TableMetadata base = readMetadata();
     Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
-    List<String> v2manifests = base.currentSnapshot().manifests();
+    List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
     Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
 
     append.commit();
@@ -147,7 +147,7 @@ public void testRefreshBeforeCommit() {
     // apply was called before the conflicting commit, but the commit was still consistent
     validateSnapshot(base.currentSnapshot(), committed.currentSnapshot(), FILE_D);
 
-    List<String> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
+    List<ManifestFile> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
     committedManifests.removeAll(base.currentSnapshot().manifests());
     Assert.assertEquals("Should reused manifest created by apply",
         pending.manifests().get(0), committedManifests.get(0));
@@ -161,13 +161,13 @@ public void testFailure() {
 
     AppendFiles append = table.newFastAppend().appendFile(FILE_B);
     Snapshot pending = append.apply();
-    String newManifest = pending.manifests().get(0);
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
 
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", append::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
   }
 
   @Test
@@ -178,15 +178,15 @@ public void testRecovery() {
 
     AppendFiles append = table.newFastAppend().appendFile(FILE_B);
     Snapshot pending = append.apply();
-    String newManifest = pending.manifests().get(0);
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
 
     append.commit();
 
     TableMetadata metadata = readMetadata();
 
     validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
-    Assert.assertTrue("Should commit same new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should commit same new manifest", new File(newManifest.path()).exists());
     Assert.assertTrue("Should commit the same new manifest",
         metadata.currentSnapshot().manifests().contains(newManifest));
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index a1e28bc..6b78c63 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -67,7 +67,7 @@ public void testMergeWithExistingManifest() {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_C)
@@ -76,7 +76,7 @@ public void testMergeWithExistingManifest() {
 
     Assert.assertEquals("Should contain 1 merged manifest for second write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -103,7 +103,7 @@ public void testMergeWithExistingManifestAfterDelete() {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.newDelete()
         .deleteFile(FILE_A)
@@ -113,7 +113,7 @@ public void testMergeWithExistingManifestAfterDelete() {
     long deleteId = delete.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 filtered manifest for delete",
         1, delete.currentSnapshot().manifests().size());
-    String deleteManifest = delete.currentSnapshot().manifests().get(0);
+    ManifestFile deleteManifest = delete.currentSnapshot().manifests().get(0);
 
     validateManifestEntries(deleteManifest,
         ids(deleteId, baseId),
@@ -127,7 +127,7 @@ public void testMergeWithExistingManifestAfterDelete() {
 
     Assert.assertEquals("Should contain 1 merged manifest for second write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -168,7 +168,7 @@ public void testMinMergeCount() {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should have 3 unmerged manifests",
         3, base.currentSnapshot().manifests().size());
-    Set<String> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
+    Set<ManifestFile> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_D)
@@ -176,7 +176,7 @@ public void testMinMergeCount() {
 
     Assert.assertEquals("Should contain 1 merged manifest after the 4th write",
         1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertFalse("Should not contain previous manifests", unmerged.contains(newManifest));
 
     long pendingId = pending.snapshotId();
@@ -204,7 +204,7 @@ public void testMergeSizeTargetWithExistingManifest() {
     long baseId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newAppend()
         .appendFile(FILE_C)
@@ -213,7 +213,7 @@ public void testMergeSizeTargetWithExistingManifest() {
 
     Assert.assertEquals("Should contain 2 unmerged manifests after second write",
         2, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
     Assert.assertNotEquals("Should not contain manifest from initial write",
         initialManifest, newManifest);
 
@@ -233,7 +233,7 @@ public void testChangedPartitionSpec() {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     // build the new spec using the table's schema, which uses fresh IDs
     PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -279,7 +279,7 @@ public void testChangedPartitionSpecMergeExisting() {
     TableMetadata base = readMetadata();
     Assert.assertEquals("Should contain 2 manifests",
         2, base.currentSnapshot().manifests().size());
-    String manifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile manifest = base.currentSnapshot().manifests().get(0);
 
     // build the new spec using the table's schema, which uses fresh IDs
     PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -319,7 +319,7 @@ public void testFailure() {
 
     TableMetadata base = readMetadata();
     long baseId = base.currentSnapshot().snapshotId();
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.ops().failCommits(5);
 
@@ -327,9 +327,9 @@ public void testFailure() {
     Snapshot pending = append.apply();
 
     Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
 
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
     validateManifest(newManifest,
         ids(pending.snapshotId(), baseId),
         concat(files(FILE_B), files(initialManifest)));
@@ -337,7 +337,7 @@ public void testFailure() {
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", append::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
   }
 
   @Test
@@ -351,7 +351,7 @@ public void testRecovery() {
 
     TableMetadata base = readMetadata();
     long baseId = base.currentSnapshot().snapshotId();
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     table.ops().failCommits(3);
 
@@ -359,9 +359,9 @@ public void testRecovery() {
     Snapshot pending = append.apply();
 
     Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
-    String newManifest = pending.manifests().get(0);
+    ManifestFile newManifest = pending.manifests().get(0);
 
-    Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
     validateManifest(newManifest,
         ids(pending.snapshotId(), baseId),
         concat(files(FILE_B), files(initialManifest)));
@@ -369,7 +369,7 @@ public void testRecovery() {
     append.commit();
 
     TableMetadata metadata = readMetadata();
-    Assert.assertTrue("Should reuse the new manifest", new File(newManifest).exists());
+    Assert.assertTrue("Should reuse the new manifest", new File(newManifest.path()).exists());
     Assert.assertEquals("Should commit the same new manifest during retry",
         Lists.newArrayList(newManifest), metadata.currentSnapshot().manifests());
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
index d667294..032b680 100644
--- a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
+++ b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
@@ -87,7 +87,7 @@ public void testDeleteWithDuplicateEntriesInManifest() {
     long baseSnapshotId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newRewrite()
         .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -127,7 +127,7 @@ public void testAddAndDelete() {
     long baseSnapshotId = base.currentSnapshot().snapshotId();
     Assert.assertEquals("Should create 1 manifest for initial write",
         1, base.currentSnapshot().manifests().size());
-    String initialManifest = base.currentSnapshot().manifests().get(0);
+    ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
 
     Snapshot pending = table.newRewrite()
         .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -167,8 +167,8 @@ public void testFailure() {
     Snapshot pending = rewrite.apply();
 
     Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
-    String manifest1 = pending.manifests().get(0);
-    String manifest2 = pending.manifests().get(1);
+    ManifestFile manifest1 = pending.manifests().get(0);
+    ManifestFile manifest2 = pending.manifests().get(1);
 
     validateManifestEntries(manifest1,
         ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -178,8 +178,8 @@ public void testFailure() {
     AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
         CommitFailedException.class, "Injected failure", rewrite::commit);
 
-    Assert.assertFalse("Should clean up new manifest", new File(manifest1).exists());
-    Assert.assertFalse("Should clean up new manifest", new File(manifest2).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists());
+    Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists());
 
     // As commit failed all the manifests added with rewrite should be cleaned up
     Assert.assertEquals("Only 1 manifest should exist", 1, listMetadataFiles("avro").size());
@@ -197,8 +197,8 @@ public void testRecovery() {
     Snapshot pending = rewrite.apply();
 
     Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
-    String manifest1 = pending.manifests().get(0);
-    String manifest2 = pending.manifests().get(1);
+    ManifestFile manifest1 = pending.manifests().get(0);
+    ManifestFile manifest2 = pending.manifests().get(1);
 
     validateManifestEntries(manifest1,
         ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -207,8 +207,8 @@ public void testRecovery() {
 
     rewrite.commit();
 
-    Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1).exists());
-    Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2).exists());
+    Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1.path()).exists());
+    Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2.path()).exists());
 
     TableMetadata metadata = readMetadata();
     Assert.assertTrue("Should commit the manifest for append",
diff --git a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
index 4c55c67..dbcc811 100644
--- a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
@@ -19,19 +19,72 @@
 
 package com.netflix.iceberg;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static com.netflix.iceberg.Files.localInput;
 
 public class TestSnapshotJson {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public TableOperations ops = new LocalTableOperations(temp);
+
   @Test
   public void testJsonConversion() {
-    Snapshot expected = new BaseSnapshot(null, System.currentTimeMillis(),
+    Snapshot expected = new BaseSnapshot(ops, System.currentTimeMillis(),
         "file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro");
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(null, json);
+    Snapshot snapshot = SnapshotParser.fromJson(ops, json);
+
+    Assert.assertEquals("Snapshot ID should match",
+        expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Files should match",
+        expected.manifests(), snapshot.manifests());
+  }
+
+  @Test
+  public void testJsonConversionWithManifestList() throws IOException {
+    long parentId = 1;
+    long id = 2;
+    List<ManifestFile> manifests = ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
+        new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
+
+    File manifestList = temp.newFile("manifests");
+    Assert.assertTrue(manifestList.delete());
+    manifestList.deleteOnExit();
+
+    try (ManifestListWriter writer = new ManifestListWriter(
+        Files.localOutput(manifestList), id, parentId)) {
+      writer.addAll(manifests);
+    }
+
+    Snapshot expected = new BaseSnapshot(
+        ops, id, parentId, System.currentTimeMillis(), localInput(manifestList));
+    Snapshot inMemory = new BaseSnapshot(
+        ops, id, parentId, expected.timestampMillis(), manifests);
+
+    Assert.assertEquals("Files should match in memory list",
+        inMemory.manifests(), expected.manifests());
+
+    String json = SnapshotParser.toJson(expected);
+    Snapshot snapshot = SnapshotParser.fromJson(ops, json);
 
     Assert.assertEquals("Snapshot ID should match",
         expected.snapshotId(), snapshot.snapshotId());
+    Assert.assertEquals("Timestamp should match",
+        expected.timestampMillis(), snapshot.timestampMillis());
+    Assert.assertEquals("Parent ID should match",
+        expected.parentId(), snapshot.parentId());
+    Assert.assertEquals("Manifest list should match",
+        expected.manifestListLocation(), snapshot.manifestListLocation());
     Assert.assertEquals("Files should match",
         expected.manifests(), snapshot.manifests());
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 21acdbd..0b04fac 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -29,7 +29,9 @@
 import com.netflix.iceberg.types.Types;
 import com.netflix.iceberg.util.JsonUtil;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Arrays;
@@ -37,6 +39,7 @@
 import java.util.Map;
 import java.util.Random;
 
+import static com.netflix.iceberg.Files.localInput;
 import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
 import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
 import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
@@ -48,6 +51,11 @@
 import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
 
 public class TestTableMetadataJson {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public TableOperations ops = new LocalTableOperations(temp);
+
   @Test
   public void testJsonConversion() throws Exception {
     Schema schema = new Schema(
@@ -60,23 +68,25 @@ public void testJsonConversion() throws Exception {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
         .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
         .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
         .build();
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
 
     String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     Assert.assertEquals("Table location should match",
@@ -120,14 +130,16 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
     List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
@@ -139,7 +151,7 @@ public void testFromJsonSortsSnapshotLog() throws Exception {
         new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()));
 
     String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     List<SnapshotLogEntry> expectedSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
@@ -163,18 +175,20 @@ public void testBackwardCompatMissingPartitionSpecList() throws Exception {
 
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot = new BaseSnapshot(
-        null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+        ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot = new BaseSnapshot(
-        null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+        ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+        new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
-    TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+    TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
         ImmutableMap.of("property", "value"), currentSnapshotId,
         Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
 
     String asJson = toJsonWithoutSpecList(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+    TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
         JsonUtil.mapper().readValue(asJson, JsonNode.class));
 
     Assert.assertEquals("Table location should match",
diff --git a/core/src/test/java/com/netflix/iceberg/TestTransaction.java b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
index 317ad5c..bdc3ddd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTransaction.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
@@ -274,7 +274,7 @@ public void testTransactionRetry() {
         .appendFile(FILE_B)
         .commit();
 
-    Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+    Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -313,7 +313,7 @@ public void testTransactionRetryMergeAppend() {
         .appendFile(FILE_B)
         .commit();
 
-    Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+    Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -327,13 +327,13 @@ public void testTransactionRetryMergeAppend() {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> expectedManifests = Sets.newHashSet();
+    Set<ManifestFile> expectedManifests = Sets.newHashSet();
     expectedManifests.addAll(appendManifests);
     expectedManifests.addAll(conflictAppendManifests);
 
@@ -370,7 +370,7 @@ public void testMultipleUpdateTransactionRetryMergeCleanup() {
 
     Assert.assertEquals("Append should create one manifest",
         1, t.table().currentSnapshot().manifests().size());
-    String appendManifest = t.table().currentSnapshot().manifests().get(0);
+    ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -384,13 +384,13 @@ public void testMultipleUpdateTransactionRetryMergeCleanup() {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> previousManifests = Sets.newHashSet();
+    Set<ManifestFile> previousManifests = Sets.newHashSet();
     previousManifests.add(appendManifest);
     previousManifests.addAll(conflictAppendManifests);
 
@@ -399,7 +399,7 @@ public void testMultipleUpdateTransactionRetryMergeCleanup() {
     Assert.assertFalse("Should merge both commit manifests into a new manifest",
         previousManifests.contains(table.currentSnapshot().manifests().get(0)));
 
-    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
   }
 
   @Test
@@ -427,7 +427,7 @@ public void testTransactionRetryMergeCleanup() {
 
     Assert.assertEquals("Append should create one manifest",
         1, t.table().currentSnapshot().manifests().size());
-    String appendManifest = t.table().currentSnapshot().manifests().get(0);
+    ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
 
     Assert.assertSame("Base metadata should not change when commit is created",
         base, readMetadata());
@@ -441,13 +441,13 @@ public void testTransactionRetryMergeCleanup() {
 
     Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
 
-    Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+    Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
 
     t.commitTransaction();
 
     Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
 
-    Set<String> previousManifests = Sets.newHashSet();
+    Set<ManifestFile> previousManifests = Sets.newHashSet();
     previousManifests.add(appendManifest);
     previousManifests.addAll(conflictAppendManifests);
 
@@ -456,6 +456,6 @@ public void testTransactionRetryMergeCleanup() {
     Assert.assertFalse("Should merge both commit manifests into a new manifest",
         previousManifests.contains(table.currentSnapshot().manifests().get(0)));
 
-    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+    Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services