You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/12 19:55:40 UTC

[incubator-iceberg] branch master updated: Add metadata tables (#252)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e4c6fb9  Add metadata tables (#252)
e4c6fb9 is described below

commit e4c6fb979003b1629961b0c950ec6acec994de3b
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Fri Jul 12 12:55:36 2019 -0700

    Add metadata tables (#252)
    
    This commit adds metadata tables:
    * history -- the snapshot log
    * snapshots -- known snapshots
    * manifests -- manifest files for a snapshot
    * files -- data files for a snapshot
    * entries -- manifest entries for a snapshot
---
 .../iceberg/{ScanTask.java => DataTask.java}       |  30 +-
 .../main/java/org/apache/iceberg/FileFormat.java   |  22 +-
 .../iceberg/{ScanTask.java => HistoryEntry.java}   |  30 +-
 .../main/java/org/apache/iceberg/ManifestFile.java |  27 ++
 api/src/main/java/org/apache/iceberg/ScanTask.java |  15 +
 api/src/main/java/org/apache/iceberg/Table.java    |   8 +
 .../main/java/org/apache/iceberg/TableScan.java    |  10 +
 ...nifestEvaluator.java => ManifestEvaluator.java} |  23 +-
 .../iceberg/expressions/ResidualEvaluator.java     |  45 ++-
 .../org/apache/iceberg/io/CloseableIterable.java   |  11 +
 .../TestInclusiveManifestEvaluator.java            | 130 +++++----
 .../apache/iceberg/transforms/TestResiduals.java   |  25 +-
 .../{BaseTable.java => BaseMetadataTable.java}     |  98 +++----
 .../org/apache/iceberg/BaseMetastoreCatalog.java   |  49 +++-
 .../main/java/org/apache/iceberg/BaseTable.java    |   8 +-
 .../java/org/apache/iceberg/BaseTableScan.java     | 100 +++----
 .../java/org/apache/iceberg/BaseTransaction.java   |   5 +
 .../main/java/org/apache/iceberg/DataFiles.java    |  11 +
 .../java/org/apache/iceberg/DataFilesTable.java    | 161 +++++++++++
 .../java/org/apache/iceberg/DataTableScan.java     | 108 ++++++++
 .../java/org/apache/iceberg/FilteredManifest.java  |  12 +-
 .../main/java/org/apache/iceberg/HistoryTable.java | 105 +++++++
 .../org/apache/iceberg/ManifestEntriesTable.java   | 114 ++++++++
 .../java/org/apache/iceberg/ManifestGroup.java     | 108 ++++++--
 .../java/org/apache/iceberg/ManifestsTable.java    | 123 +++++++++
 .../java/org/apache/iceberg/SnapshotsTable.java    |  93 +++++++
 .../java/org/apache/iceberg/StaticDataTask.java    | 116 ++++++++
 .../java/org/apache/iceberg/StaticTableScan.java   |  63 +++++
 .../java/org/apache/iceberg/TableMetadata.java     |  26 +-
 .../org/apache/iceberg/TableMetadataParser.java    |   2 +-
 .../java/org/apache/iceberg/util/SnapshotUtil.java |  58 ++++
 ...stBaseTableScan.java => TestDataTableScan.java} |   2 +-
 .../apache/iceberg/TestEntriesMetadataTable.java   |  68 +++++
 .../org/apache/iceberg/TestTableMetadataJson.java  |   4 +-
 .../java/org/apache/iceberg/hive/HiveCatalog.java  |   2 +-
 .../org/apache/iceberg/spark/source/Reader.java    |  50 ++--
 .../iceberg/spark/source/StructInternalRow.java    | 276 +++++++++++++++++++
 .../org/apache/iceberg/spark/data/TestHelpers.java |   2 +-
 .../spark/source/TestIcebergSourceHiveTables.java  | 305 +++++++++++++++++++++
 39 files changed, 2107 insertions(+), 338 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/DataTask.java
similarity index 53%
copy from api/src/main/java/org/apache/iceberg/ScanTask.java
copy to api/src/main/java/org/apache/iceberg/DataTask.java
index 7c7350e..329b7dc 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/DataTask.java
@@ -19,32 +19,24 @@
 
 package org.apache.iceberg;
 
-import java.io.Serializable;
+import org.apache.iceberg.io.CloseableIterable;
 
 /**
- * A scan task.
+ * A task that returns data as {@link StructLike rows} instead of where to read data.
  */
-public interface ScanTask extends Serializable {
-  /**
-   * @return true if this is a {@link FileScanTask}, false otherwise.
-   */
-  default boolean isFileScanTask() {
-    return false;
+public interface DataTask extends FileScanTask {
+  @Override
+  default boolean isDataTask() {
+    return true;
   }
 
-  /**
-   * @return this cast to {@link FileScanTask} if it is one
-   * @throws IllegalStateException if this is not a {@link FileScanTask}
-   */
-  default FileScanTask asFileScanTask() {
-    throw new IllegalStateException("Not a FileScanTask: " + this);
+  @Override
+  default DataTask asDataTask() {
+    return this;
   }
 
   /**
-   * @return this cast to {@link CombinedScanTask} if it is one
-   * @throws IllegalStateException if this is not a {@link CombinedScanTask}
+   * @return an iterable of {@link StructLike} rows
    */
-  default CombinedScanTask asCombinedScanTask() {
-    throw new IllegalStateException("Not a CombinedScanTask: " + this);
-  }
+  CloseableIterable<StructLike> rows();
 }
diff --git a/api/src/main/java/org/apache/iceberg/FileFormat.java b/api/src/main/java/org/apache/iceberg/FileFormat.java
index 27729af..6bcab8e 100644
--- a/api/src/main/java/org/apache/iceberg/FileFormat.java
+++ b/api/src/main/java/org/apache/iceberg/FileFormat.java
@@ -27,7 +27,8 @@ import org.apache.iceberg.types.Comparators;
 public enum FileFormat {
   ORC("orc", true),
   PARQUET("parquet", true),
-  AVRO("avro", true);
+  AVRO("avro", true),
+  METADATA("metadata.json", false);
 
   private final String ext;
   private final boolean splittable;
@@ -55,28 +56,13 @@ public enum FileFormat {
   }
 
   public static FileFormat fromFileName(CharSequence filename) {
-    int lastIndex = lastIndexOf('.', filename);
-    if (lastIndex < 0) {
-      return null;
-    }
-
-    CharSequence ext = filename.subSequence(lastIndex, filename.length());
-
     for (FileFormat format : FileFormat.values()) {
-      if (Comparators.charSequences().compare(format.ext, ext) == 0) {
+      int extStart = filename.length() - format.ext.length();
+      if (Comparators.charSequences().compare(format.ext, filename.subSequence(extStart, filename.length())) == 0) {
         return format;
       }
     }
 
     return null;
   }
-
-  private static int lastIndexOf(char ch, CharSequence seq) {
-    for (int i = seq.length() - 1; i >= 0; i -= 1) {
-      if (seq.charAt(i) == ch) {
-        return i;
-      }
-    }
-    return -1;
-  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/HistoryEntry.java
similarity index 53%
copy from api/src/main/java/org/apache/iceberg/ScanTask.java
copy to api/src/main/java/org/apache/iceberg/HistoryEntry.java
index 7c7350e..5b0b595 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/HistoryEntry.java
@@ -19,32 +19,20 @@
 
 package org.apache.iceberg;
 
-import java.io.Serializable;
-
 /**
- * A scan task.
+ * Table history entry.
+ * <p>
+ * An entry contains a change to the table state. At the given timestamp, the current snapshot was
+ * set to the given snapshot ID.
  */
-public interface ScanTask extends Serializable {
-  /**
-   * @return true if this is a {@link FileScanTask}, false otherwise.
-   */
-  default boolean isFileScanTask() {
-    return false;
-  }
-
+public interface HistoryEntry {
   /**
-   * @return this cast to {@link FileScanTask} if it is one
-   * @throws IllegalStateException if this is not a {@link FileScanTask}
+   * @return the timestamp in milliseconds of the change
    */
-  default FileScanTask asFileScanTask() {
-    throw new IllegalStateException("Not a FileScanTask: " + this);
-  }
+  long timestampMillis();
 
   /**
-   * @return this cast to {@link CombinedScanTask} if it is one
-   * @throws IllegalStateException if this is not a {@link CombinedScanTask}
+   * @return ID of the new current snapshot
    */
-  default CombinedScanTask asCombinedScanTask() {
-    throw new IllegalStateException("Not a CombinedScanTask: " + this);
-  }
+  long snapshotId();
 }
diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 2618037..7163980 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -69,16 +69,43 @@ public interface ManifestFile {
   Long snapshotId();
 
   /**
+   * Returns true if the manifest contains ADDED entries or if the count is not known.
+   *
+   * @return whether this manifest contains entries with ADDED status
+   */
+  default boolean hasAddedFiles() {
+    return addedFilesCount() == null || addedFilesCount() > 0;
+  }
+
+  /**
    * @return the number of data files with status ADDED in the manifest file
    */
   Integer addedFilesCount();
 
   /**
+   * Returns true if the manifest contains EXISTING entries or if the count is not known.
+   *
+   * @return whether this manifest contains entries with EXISTING status
+   */
+  default boolean hasExistingFiles() {
+    return existingFilesCount() == null || existingFilesCount() > 0;
+  }
+
+  /**
    * @return the number of data files with status EXISTING in the manifest file
    */
   Integer existingFilesCount();
 
   /**
+   * Returns true if the manifest contains DELETED entries or if the count is not known.
+   *
+   * @return whether this manifest contains entries with DELETED status
+   */
+  default boolean hasDeletedFiles() {
+    return deletedFilesCount() == null || deletedFilesCount() > 0;
+  }
+
+  /**
    * @return the number of data files with status DELETED in the manifest file
    */
   Integer deletedFilesCount();
diff --git a/api/src/main/java/org/apache/iceberg/ScanTask.java b/api/src/main/java/org/apache/iceberg/ScanTask.java
index 7c7350e..8a17144 100644
--- a/api/src/main/java/org/apache/iceberg/ScanTask.java
+++ b/api/src/main/java/org/apache/iceberg/ScanTask.java
@@ -41,6 +41,21 @@ public interface ScanTask extends Serializable {
   }
 
   /**
+   * @return true if this is a {@link DataTask}, false otherwise.
+   */
+  default boolean isDataTask() {
+    return false;
+  }
+
+  /**
+   * @return this cast to {@link DataTask} if it is one
+   * @throws IllegalStateException if this is not a {@link DataTask}
+   */
+  default DataTask asDataTask() {
+    throw new IllegalStateException("Not a DataTask: " + this);
+  }
+
+  /**
    * @return this cast to {@link CombinedScanTask} if it is one
    * @throws IllegalStateException if this is not a {@link CombinedScanTask}
    */
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index b957f2e..db15d0e 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
@@ -94,6 +95,13 @@ public interface Table {
   Iterable<Snapshot> snapshots();
 
   /**
+   * Get the snapshot history of this table.
+   *
+   * @return a list of {@link HistoryEntry history entries}
+   */
+  List<HistoryEntry> history();
+
+  /**
    * Create a new {@link UpdateSchema} to alter the columns of this table and commit the change.
    *
    * @return a new {@link UpdateSchema}
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 8636a94..8259fd1 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -156,6 +156,16 @@ public interface TableScan {
   Schema schema();
 
   /**
+   * Returns the {@link Snapshot} that will be used by this scan.
+   * <p>
+   * If the snapshot was not configured using {@link #asOfTime(long)} or {@link #useSnapshot(long)}, the current table
+   * snapshot will be used.
+   *
+   * @return the Snapshot this scan will use
+   */
+  Snapshot snapshot();
+
+  /**
    * Returns whether this scan should apply column name case sensitiveness as per {@link #caseSensitive(boolean)}.
    * @return true if case sensitive, false otherwise.
    */
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
similarity index 90%
rename from api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
rename to api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
index 7550ea5..85bac1c 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveManifestEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.Accessors;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types.StructType;
@@ -36,15 +35,14 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
  * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
  * matching partitions.
  * <p>
- * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ * For row expressions, evaluation is inclusive: it returns true if a file may match and false if it cannot match.
  * <p>
  * Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
  * data files that match the partition expression. Manifest files may be skipped if and only if the
  * return value of {@code eval} is false.
  */
-public class InclusiveManifestEvaluator {
+public class ManifestEvaluator {
   private final StructType struct;
-  private final Schema schema;
   private final Expression expr;
   private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
 
@@ -55,17 +53,18 @@ public class InclusiveManifestEvaluator {
     return visitors.get();
   }
 
-  public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
-    this(spec, rowFilter, true);
+  public static ManifestEvaluator forRowFilter(Expression rowFilter, PartitionSpec spec, boolean caseSensitive) {
+    return new ManifestEvaluator(spec, Projections.inclusive(spec, caseSensitive).project(rowFilter), caseSensitive);
   }
 
-  public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter, boolean caseSensitive) {
+  public static ManifestEvaluator forPartitionFilter(
+      Expression partitionFilter, PartitionSpec spec, boolean caseSensitive) {
+    return new ManifestEvaluator(spec, partitionFilter, caseSensitive);
+  }
+
+  private ManifestEvaluator(PartitionSpec spec, Expression partitionFilter, boolean caseSensitive) {
     this.struct = spec.partitionType();
-    this.expr = Binder.bind(
-      struct,
-      rewriteNot(Projections.inclusive(spec, caseSensitive).project(rowFilter)),
-      caseSensitive);
-    this.schema = new Schema(struct.fields());
+    this.expr = Binder.bind(struct, rewriteNot(partitionFilter), caseSensitive);
   }
 
   /**
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 4ffcc2f..0fae40d 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -51,6 +51,45 @@ import org.apache.iceberg.transforms.Transform;
  * This class is thread-safe.
  */
 public class ResidualEvaluator implements Serializable {
+  private static class UnpartitionedResidualEvaluator extends ResidualEvaluator {
+    private final Expression expr;
+
+    UnpartitionedResidualEvaluator(Expression expr) {
+      super(PartitionSpec.unpartitioned(), expr, false);
+      this.expr = expr;
+    }
+
+    @Override
+    public Expression residualFor(StructLike ignored) {
+      return expr;
+    }
+  }
+
+  /**
+   * Return a residual evaluator for an unpartitioned {@link PartitionSpec spec}.
+   *
+   * @param expr an expression
+   * @return a residual evaluator that always returns the expression
+   */
+  public static ResidualEvaluator unpartitioned(Expression expr) {
+    return new UnpartitionedResidualEvaluator(expr);
+  }
+
+  /**
+   * Return a residual evaluator for a {@link PartitionSpec spec} and {@link Expression expression}.
+   *
+   * @param spec a partition spec
+   * @param expr an expression
+   * @return a residual evaluator for the expression
+   */
+  public static ResidualEvaluator of(PartitionSpec spec, Expression expr, boolean caseSensitive) {
+    if (spec.fields().size() > 0) {
+      return new ResidualEvaluator(spec, expr, caseSensitive);
+    } else {
+      return unpartitioned(expr);
+    }
+  }
+
   private final PartitionSpec spec;
   private final Expression expr;
   private final boolean caseSensitive;
@@ -63,7 +102,7 @@ public class ResidualEvaluator implements Serializable {
     return visitors.get();
   }
 
-  public ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) {
+  private ResidualEvaluator(PartitionSpec spec, Expression expr, boolean caseSensitive) {
     this.spec = spec;
     this.expr = expr;
     this.caseSensitive = caseSensitive;
@@ -82,8 +121,8 @@ public class ResidualEvaluator implements Serializable {
   private class ResidualVisitor extends ExpressionVisitors.BoundExpressionVisitor<Expression> {
     private StructLike struct;
 
-    private Expression eval(StructLike structLike) {
-      this.struct = structLike;
+    private Expression eval(StructLike dataStruct) {
+      this.struct = dataStruct;
       return ExpressionVisitors.visit(expr, this);
     }
 
diff --git a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
index 329bd4e..731d1ef 100644
--- a/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
+++ b/api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
@@ -20,14 +20,21 @@
 package org.apache.iceberg.io;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 
 public interface CloseableIterable<T> extends Iterable<T>, Closeable {
+  static <E> CloseableIterable<E> withNoopClose(E entry) {
+    return withNoopClose(ImmutableList.of(entry));
+  }
+
   static <E> CloseableIterable<E> withNoopClose(Iterable<E> iterable) {
     return new CloseableIterable<E>() {
       @Override
@@ -59,6 +66,10 @@ public interface CloseableIterable<T> extends Iterable<T>, Closeable {
     };
   }
 
+  static <E> CloseableIterable<E> filter(CloseableIterable<E> iterable, Predicate<E> pred) {
+    return combine(Iterables.filter(iterable, pred::test), iterable);
+  }
+
   static <I, O> CloseableIterable<O> transform(CloseableIterable<I> iterable, Function<I, O> transform) {
     Preconditions.checkNotNull(transform, "Cannot apply a null transform");
 
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
index c62d32c..3859f00 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -79,25 +79,25 @@ public class TestInclusiveManifestEvaluator {
 
   @Test
   public void testAllNulls() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(notNull("all_nulls"), SPEC, true).eval(FILE);
     Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notNull("some_nulls"), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notNull("no_nulls"), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
   }
 
   @Test
   public void testNoNulls() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(isNull("all_nulls"), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(isNull("some_nulls"), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(isNull("no_nulls"), SPEC, true).eval(FILE);
     Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
   }
 
@@ -105,7 +105,7 @@ public class TestInclusiveManifestEvaluator {
   public void testMissingColumn() {
     TestHelpers.assertThrows("Should complain about missing column in expression",
         ValidationException.class, "Cannot find field 'missing'",
-        () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+        () -> ManifestEvaluator.forRowFilter(lessThan("missing", 5), SPEC, true).eval(FILE));
   }
 
   @Test
@@ -117,7 +117,7 @@ public class TestInclusiveManifestEvaluator {
     };
 
     for (Expression expr : exprs) {
-      boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+      boolean shouldRead = ManifestEvaluator.forRowFilter(expr, SPEC, true).eval(NO_STATS);
       Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
     }
   }
@@ -125,194 +125,190 @@ public class TestInclusiveManifestEvaluator {
   @Test
   public void testNot() {
     // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(not(lessThan("id", 5)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: not(false)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(greaterThan("id", 5)), SPEC, true).eval(FILE);
     Assert.assertFalse("Should skip: not(true)", shouldRead);
   }
 
   @Test
   public void testAnd() {
     // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
-    boolean shouldRead = new InclusiveManifestEvaluator(
-        SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(
+        and(lessThan("id", 5), greaterThanOrEqual("id", 0)), SPEC, true).eval(FILE);
     Assert.assertFalse("Should skip: and(false, false)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(
-        SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(
+        and(greaterThan("id", 5), lessThanOrEqual("id", 30)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: and(true, true)", shouldRead);
   }
 
   @Test
   public void testOr() {
     // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
-    boolean shouldRead = new InclusiveManifestEvaluator(
-        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(
+        or(lessThan("id", 5), greaterThanOrEqual("id", 80)), SPEC, true).eval(FILE);
     Assert.assertFalse("Should skip: or(false, false)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(
-        SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(
+        or(lessThan("id", 5), greaterThanOrEqual("id", 60)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: or(false, true)", shouldRead);
   }
 
   @Test
   public void testIntegerLt() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 5), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 30), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 31), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: one possible id", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThan("id", 79), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: may possible ids", shouldRead);
   }
 
   @Test
   public void testIntegerLtEq() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 5), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 29), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 30), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: one possible id", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(lessThanOrEqual("id", 79), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: many possible ids", shouldRead);
   }
 
   @Test
   public void testIntegerGt() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 85), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 79), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 78), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: one possible id", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThan("id", 75), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: may possible ids", shouldRead);
   }
 
   @Test
   public void testIntegerGtEq() {
-    boolean shouldRead = new InclusiveManifestEvaluator(
-        SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 85), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(
-        SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 80), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(
-        SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 79), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: one possible id", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(
-        SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(greaterThanOrEqual("id", 75), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: may possible ids", shouldRead);
   }
 
   @Test
   public void testIntegerEq() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(equal("id", 5), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 29), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 30), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 75), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 79), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 80), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id above upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(equal("id", 85), SPEC, true).eval(FILE);
     Assert.assertFalse("Should not read: id above upper bound", shouldRead);
   }
 
   @Test
   public void testIntegerNotEq() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 5), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 29), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 30), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 75), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 79), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 80), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(notEqual("id", 85), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
   }
 
   @Test
   public void testIntegerNotEqRewritten() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 5)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 29)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 30)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 75)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 79)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 80)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("id", 85)), SPEC, true).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
   }
 
   @Test
   public void testCaseInsensitiveIntegerNotEqRewritten() {
-    boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), false).eval(FILE);
+    boolean shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 5)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 29)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 29)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id below lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 30)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 30)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 75)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 75)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 79)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 79)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 80)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 80)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
 
-    shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("ID", 85)), false).eval(FILE);
+    shouldRead = ManifestEvaluator.forRowFilter(not(equal("ID", 85)), SPEC, false).eval(FILE);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
   }
 
@@ -320,6 +316,6 @@ public class TestInclusiveManifestEvaluator {
   public void testCaseSensitiveIntegerNotEqRewritten() {
     TestHelpers.assertThrows("Should complain about missing column in expression",
         ValidationException.class, "Cannot find field 'ID'",
-        () -> new InclusiveManifestEvaluator(SPEC, not(equal("ID", 5)), true).eval(FILE));
+        () -> ManifestEvaluator.forRowFilter(not(equal("ID", 5)), SPEC, true).eval(FILE));
   }
 }
diff --git a/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java b/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
index 3b60f74..f3ca075 100644
--- a/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
+++ b/api/src/test/java/org/apache/iceberg/transforms/TestResiduals.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.types.Types;
@@ -53,7 +54,7 @@ public class TestResiduals {
         .identity("dateint")
         .build();
 
-    ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or(
+    ResidualEvaluator resEval = ResidualEvaluator.of(spec, or(or(
         and(lessThan("dateint", 20170815), greaterThan("dateint", 20170801)),
         and(equal("dateint", 20170815), lessThan("hour", 12))),
         and(equal("dateint", 20170801), greaterThan("hour", 11))),
@@ -93,7 +94,7 @@ public class TestResiduals {
         .identity("dateint")
         .build();
 
-    ResidualEvaluator resEval = new ResidualEvaluator(spec, or(or(
+    ResidualEvaluator resEval = ResidualEvaluator.of(spec, or(or(
         and(lessThan("DATEINT", 20170815), greaterThan("dateint", 20170801)),
         and(equal("dateint", 20170815), lessThan("HOUR", 12))),
         and(equal("DateInt", 20170801), greaterThan("hOUr", 11))),
@@ -133,8 +134,26 @@ public class TestResiduals {
         .identity("dateint")
         .build();
 
-    ResidualEvaluator resEval = new ResidualEvaluator(spec, lessThan("DATEINT", 20170815), true);
+    ResidualEvaluator resEval = ResidualEvaluator.of(spec, lessThan("DATEINT", 20170815), true);
 
     resEval.residualFor(Row.of(20170815));
   }
+
+  @Test
+  public void testUnpartitionedResiduals() {
+    Expression[] expressions = new Expression[] {
+        Expressions.alwaysTrue(),
+        Expressions.alwaysFalse(),
+        Expressions.lessThan("a", 5),
+        Expressions.greaterThanOrEqual("b", 16),
+        Expressions.notNull("c"),
+        Expressions.isNull("d")
+    };
+
+    for (Expression expr : expressions) {
+      ResidualEvaluator residualEvaluator = ResidualEvaluator.of(PartitionSpec.unpartitioned(), expr, true);
+      Assert.assertEquals("Should return expression",
+          expr, residualEvaluator.residualFor(Row.of()));
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
similarity index 57%
copy from core/src/main/java/org/apache/iceberg/BaseTable.java
copy to core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 32182ab..4bebc13 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -19,157 +19,129 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 
-/**
- * Base {@link Table} implementation.
- * <p>
- * This can be extended by providing a {@link TableOperations} to the constructor.
- */
-public class BaseTable implements Table, HasTableOperations {
-  private final TableOperations ops;
-  private final String name;
-
-  public BaseTable(TableOperations ops, String name) {
-    this.ops = ops;
-    this.name = name;
-  }
+abstract class BaseMetadataTable implements Table {
+  abstract Table table();
+  abstract String metadataTableName();
 
   @Override
-  public TableOperations operations() {
-    return ops;
+  public FileIO io() {
+    return table().io();
   }
 
   @Override
-  public void refresh() {
-    ops.refresh();
+  public EncryptionManager encryption() {
+    return table().encryption();
   }
 
   @Override
-  public TableScan newScan() {
-    return new BaseTableScan(ops, this);
+  public LocationProvider locationProvider() {
+    return table().locationProvider();
   }
 
   @Override
-  public Schema schema() {
-    return ops.current().schema();
+  public void refresh() {
+    table().refresh();
   }
 
   @Override
   public PartitionSpec spec() {
-    return ops.current().spec();
+    return PartitionSpec.unpartitioned();
   }
 
   @Override
   public Map<String, String> properties() {
-    return ops.current().properties();
+    return ImmutableMap.of();
   }
 
   @Override
-  public String location() {
-    return ops.current().location();
+  public Snapshot currentSnapshot() {
+    return table().currentSnapshot();
   }
 
   @Override
-  public Snapshot currentSnapshot() {
-    return ops.current().currentSnapshot();
+  public Iterable<Snapshot> snapshots() {
+    return table().snapshots();
   }
 
   @Override
   public Snapshot snapshot(long snapshotId) {
-    return ops.current().snapshot(snapshotId);
+    return table().snapshot(snapshotId);
   }
 
   @Override
-  public Iterable<Snapshot> snapshots() {
-    return ops.current().snapshots();
+  public List<HistoryEntry> history() {
+    return table().history();
   }
 
   @Override
   public UpdateSchema updateSchema() {
-    return new SchemaUpdate(ops);
+    throw new UnsupportedOperationException("Cannot update the schema of a metadata table");
   }
 
   @Override
   public UpdateProperties updateProperties() {
-    return new PropertiesUpdate(ops);
+    throw new UnsupportedOperationException("Cannot update the properties of a metadata table");
   }
 
   @Override
   public UpdateLocation updateLocation() {
-    return new SetLocation(ops);
+    throw new UnsupportedOperationException("Cannot update the location of a metadata table");
   }
 
   @Override
   public AppendFiles newAppend() {
-    return new MergeAppend(ops);
-  }
-
-  @Override
-  public AppendFiles newFastAppend() {
-    return new FastAppend(ops);
+    throw new UnsupportedOperationException("Cannot append to a metadata table");
   }
 
   @Override
   public RewriteFiles newRewrite() {
-    return new ReplaceFiles(ops);
+    throw new UnsupportedOperationException("Cannot rewrite in a metadata table");
   }
 
   @Override
   public RewriteManifests rewriteManifests() {
-    return new ReplaceManifests(ops);
+    throw new UnsupportedOperationException("Cannot rewrite manifests in a metadata table");
   }
 
   @Override
   public OverwriteFiles newOverwrite() {
-    return new OverwriteData(ops);
+    throw new UnsupportedOperationException("Cannot overwrite in a metadata table");
   }
 
   @Override
   public ReplacePartitions newReplacePartitions() {
-    return new ReplacePartitionsOperation(ops);
+    throw new UnsupportedOperationException("Cannot replace partitions in a metadata table");
   }
 
   @Override
   public DeleteFiles newDelete() {
-    return new StreamingDelete(ops);
+    throw new UnsupportedOperationException("Cannot delete from a metadata table");
   }
 
   @Override
   public ExpireSnapshots expireSnapshots() {
-    return new RemoveSnapshots(ops);
+    throw new UnsupportedOperationException("Cannot expire snapshots from a metadata table");
   }
 
   @Override
   public Rollback rollback() {
-    return new RollbackToSnapshot(ops);
+    throw new UnsupportedOperationException("Cannot roll back a metadata table");
   }
 
   @Override
   public Transaction newTransaction() {
-    return Transactions.newTransaction(ops);
-  }
-
-  @Override
-  public FileIO io() {
-    return operations().io();
-  }
-
-  @Override
-  public EncryptionManager encryption() {
-    return operations().encryption();
-  }
-
-  @Override
-  public LocationProvider locationProvider() {
-    return operations().locationProvider();
+    throw new UnsupportedOperationException("Cannot create transactions for a metadata table");
   }
 
   @Override
   public String toString() {
-    return name;
+    return table().toString() + "." + metadataTableName();
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 9e2dcdd..06d0a1e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg;
 
 import com.google.common.collect.Maps;
+import java.util.Locale;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.catalog.Catalog;
@@ -29,6 +30,22 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 
 public abstract class BaseMetastoreCatalog implements Catalog {
+  enum TableType {
+    ENTRIES,
+    FILES,
+    HISTORY,
+    SNAPSHOTS,
+    MANIFESTS;
+
+    static TableType from(String name) {
+      try {
+        return TableType.valueOf(name.toUpperCase(Locale.ROOT));
+      } catch (IllegalArgumentException ignored) {
+        return null;
+      }
+    }
+  }
+
   private final Configuration conf;
 
   protected BaseMetastoreCatalog(Configuration conf) {
@@ -70,12 +87,42 @@ public abstract class BaseMetastoreCatalog implements Catalog {
   public Table loadTable(TableIdentifier identifier) {
     TableOperations ops = newTableOps(conf, identifier);
     if (ops.current() == null) {
-      throw new NoSuchTableException("Table does not exist: " + identifier);
+      String name = identifier.name();
+      TableType type = TableType.from(name);
+      if (type != null) {
+        return loadMetadataTable(TableIdentifier.of(identifier.namespace().levels()), type);
+      } else {
+        throw new NoSuchTableException("Table does not exist: " + identifier);
+      }
     }
 
     return new BaseTable(ops, identifier.toString());
   }
 
+  private Table loadMetadataTable(TableIdentifier identifier, TableType type) {
+    TableOperations ops = newTableOps(conf, identifier);
+    if (ops.current() == null) {
+      throw new NoSuchTableException("Table does not exist: " + identifier);
+    }
+
+    Table baseTable = new BaseTable(ops, identifier.toString());
+
+    switch (type) {
+      case ENTRIES:
+        return new ManifestEntriesTable(ops, baseTable);
+      case FILES:
+        return new DataFilesTable(ops, baseTable);
+      case HISTORY:
+        return new HistoryTable(ops, baseTable);
+      case SNAPSHOTS:
+        return new SnapshotsTable(ops, baseTable);
+      case MANIFESTS:
+        return new ManifestsTable(ops, baseTable);
+      default:
+        throw new NoSuchTableException(String.format("Unknown metadata table type: %s for %s", type, identifier));
+    }
+  }
+
   protected abstract TableOperations newTableOps(Configuration newConf, TableIdentifier tableIdentifier);
 
   protected abstract String defaultWarehouseLocation(Configuration hadoopConf, TableIdentifier tableIdentifier);
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 32182ab..7d4ef02 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
@@ -50,7 +51,7 @@ public class BaseTable implements Table, HasTableOperations {
 
   @Override
   public TableScan newScan() {
-    return new BaseTableScan(ops, this);
+    return new DataTableScan(ops, this);
   }
 
   @Override
@@ -89,6 +90,11 @@ public class BaseTable implements Table, HasTableOperations {
   }
 
   @Override
+  public List<HistoryEntry> history() {
+    return ops.current().snapshotLog();
+  }
+
+  @Override
   public UpdateSchema updateSchema() {
     return new SchemaUpdate(ops);
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index abbd4c7..b0c52d7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,13 +19,9 @@
 
 package org.apache.iceberg;
 
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -33,42 +29,27 @@ import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
-import org.apache.iceberg.TableMetadata.SnapshotLogEntry;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
-import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.BinPacking;
-import org.apache.iceberg.util.ParallelIterable;
-import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Base class for {@link TableScan} implementations.
  */
-class BaseTableScan implements TableScan {
+@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
+abstract class BaseTableScan implements TableScan {
   private static final Logger LOG = LoggerFactory.getLogger(TableScan.class);
 
   private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
-  private static final List<String> SCAN_COLUMNS = ImmutableList.of(
-      "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
-      "file_size_in_bytes", "record_count", "partition"
-  );
-  private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
-      .addAll(SCAN_COLUMNS)
-      .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
-      .build();
-  private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
-      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
 
   private final TableOperations ops;
   private final Table table;
@@ -78,13 +59,12 @@ class BaseTableScan implements TableScan {
   private final boolean caseSensitive;
   private final boolean colStats;
   private final Collection<String> selectedColumns;
-  private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
 
-  BaseTableScan(TableOperations ops, Table table) {
-    this(ops, table, null, table.schema(), Expressions.alwaysTrue(), true, false, null);
+  protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
+    this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null);
   }
 
-  private BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
+  protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
                         Expression rowFilter, boolean caseSensitive, boolean colStats,
                         Collection<String> selectedColumns) {
     this.ops = ops;
@@ -95,12 +75,20 @@ class BaseTableScan implements TableScan {
     this.caseSensitive = caseSensitive;
     this.colStats = colStats;
     this.selectedColumns = selectedColumns;
-    this.evalCache = Caffeine.newBuilder().build(specId -> {
-      PartitionSpec spec = ops.current().spec(specId);
-      return new InclusiveManifestEvaluator(spec, rowFilter, caseSensitive);
-    });
   }
 
+  @SuppressWarnings("checkstyle:HiddenField")
+  protected abstract long targetSplitSize(TableOperations ops);
+
+  @SuppressWarnings("checkstyle:HiddenField")
+  protected abstract TableScan newRefinedScan(
+      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns);
+
+  @SuppressWarnings("checkstyle:HiddenField")
+  protected abstract CloseableIterable<FileScanTask> planFiles(
+      TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats);
+
   @Override
   public Table table() {
     return table;
@@ -112,7 +100,7 @@ class BaseTableScan implements TableScan {
         "Cannot override snapshot, already set to id=%s", scanSnapshotId);
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return new BaseTableScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    return newRefinedScan(ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
   }
 
   @Override
@@ -121,7 +109,7 @@ class BaseTableScan implements TableScan {
         "Cannot override snapshot, already set to id=%s", snapshotId);
 
     Long lastSnapshotId = null;
-    for (SnapshotLogEntry logEntry : ops.current().snapshotLog()) {
+    for (HistoryEntry logEntry : ops.current().snapshotLog()) {
       if (logEntry.timestampMillis() <= timestampMillis) {
         lastSnapshotId = logEntry.snapshotId();
       }
@@ -137,28 +125,28 @@ class BaseTableScan implements TableScan {
 
   @Override
   public TableScan project(Schema projectedSchema) {
-    return new BaseTableScan(
+    return newRefinedScan(
         ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns);
   }
 
   @Override
   public TableScan caseSensitive(boolean scanCaseSensitive) {
-    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
+    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns);
   }
 
   @Override
   public TableScan includeColumnStats() {
-    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
+    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns);
   }
 
   @Override
   public TableScan select(Collection<String> columns) {
-    return new BaseTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
+    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns);
   }
 
   @Override
   public TableScan filter(Expression expr) {
-    return new BaseTableScan(
+    return newRefinedScan(
         ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats, selectedColumns);
   }
 
@@ -169,10 +157,7 @@ class BaseTableScan implements TableScan {
 
   @Override
   public CloseableIterable<FileScanTask> planFiles() {
-    Snapshot snapshot = snapshotId != null ?
-        ops.current().snapshot(snapshotId) :
-        ops.current().currentSnapshot();
-
+    Snapshot snapshot = snapshot();
     if (snapshot != null) {
       LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
           snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
@@ -181,30 +166,7 @@ class BaseTableScan implements TableScan {
       Listeners.notifyAll(
           new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
 
-      Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
-          manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
-
-      Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
-          matchingManifests,
-          manifest -> {
-              ManifestReader reader = ManifestReader
-                  .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
-                  .caseSensitive(caseSensitive);
-            PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
-            String schemaString = SchemaParser.toJson(spec.schema());
-            String specString = PartitionSpecParser.toJson(spec);
-            ResidualEvaluator residuals = new ResidualEvaluator(spec, rowFilter, caseSensitive);
-            return CloseableIterable.transform(
-                reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
-                file -> new BaseFileScanTask(file, schemaString, specString, residuals)
-            );
-          });
-
-      if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
-        return new ParallelIterable<>(readers, ThreadPools.getWorkerPool());
-      } else {
-        return CloseableIterable.concat(readers);
-      }
+      return planFiles(ops, snapshot, rowFilter, caseSensitive, colStats);
 
     } else {
       LOG.info("Scanning empty table {}", table);
@@ -214,8 +176,7 @@ class BaseTableScan implements TableScan {
 
   @Override
   public CloseableIterable<CombinedScanTask> planTasks() {
-    long splitSize = ops.current().propertyAsLong(
-        TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
+    long splitSize = targetSplitSize(ops);
     int lookback = ops.current().propertyAsInt(
         TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
     long openFileCost = ops.current().propertyAsLong(
@@ -237,6 +198,13 @@ class BaseTableScan implements TableScan {
   }
 
   @Override
+  public Snapshot snapshot() {
+    return snapshotId != null ?
+        ops.current().snapshot(snapshotId) :
+        ops.current().currentSnapshot();
+  }
+
+  @Override
   public boolean isCaseSensitive() {
     return caseSensitive;
   }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index ee70433..1353bb7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -367,6 +367,11 @@ class BaseTransaction implements Transaction {
     }
 
     @Override
+    public List<HistoryEntry> history() {
+      return current.snapshotLog();
+    }
+
+    @Override
     public UpdateSchema updateSchema() {
       return BaseTransaction.this.updateSchema();
     }
diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java
index 1819b64..7478d2f 100644
--- a/core/src/main/java/org/apache/iceberg/DataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/DataFiles.java
@@ -150,6 +150,17 @@ public class DataFiles {
         location, format, partition, file.getLength(), metrics, keyMetadata.buffer(), splitOffsets);
   }
 
+  public static DataFile fromManifest(ManifestFile manifest) {
+    Preconditions.checkArgument(
+        manifest.addedFilesCount() != null && manifest.existingFilesCount() != null,
+        "Cannot create data file from manifest: data file counts are missing.");
+
+    return new GenericDataFile(manifest.path(),
+        FileFormat.AVRO,
+        manifest.addedFilesCount() + manifest.existingFilesCount(),
+        manifest.length());
+  }
+
   public static Builder builder(PartitionSpec spec) {
     return new Builder(spec);
   }
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
new file mode 100644
index 0000000..63ca855
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * A {@link Table} implementation that exposes a table's data files as rows.
+ */
+class DataFilesTable extends BaseMetadataTable {
+  private final TableOperations ops;
+  private final Table table;
+
+  DataFilesTable(TableOperations ops, Table table) {
+    this.ops = ops;
+    this.table = table;
+  }
+
+  @Override
+  Table table() {
+    return table;
+  }
+
+  @Override
+  String metadataTableName() {
+    return "files";
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new FilesTableScan(ops, table);
+  }
+
+  @Override
+  public Schema schema() {
+    return new Schema(DataFile.getType(table.spec().partitionType()).fields());
+  }
+
+  @Override
+  public String location() {
+    return table.currentSnapshot().manifestListLocation();
+  }
+
+  public static class FilesTableScan extends BaseTableScan {
+    private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+    FilesTableScan(TableOperations ops, Table table) {
+      super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+    }
+
+    private FilesTableScan(
+        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    }
+
+    @Override
+    protected TableScan newRefinedScan(
+        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      return new FilesTableScan(
+          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    }
+
+    @Override
+    protected long targetSplitSize(TableOperations ops) {
+      return TARGET_SPLIT_SIZE;
+    }
+
+    @Override
+    protected CloseableIterable<FileScanTask> planFiles(
+        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+      CloseableIterable<ManifestFile> manifests = Avro
+          .read(ops.io().newInputFile(snapshot.manifestListLocation()))
+          .rename("manifest_file", GenericManifestFile.class.getName())
+          .rename("partitions", GenericPartitionFieldSummary.class.getName())
+          // 508 is the id used for the partition field, and r508 is the record name created for it in Avro schemas
+          .rename("r508", GenericPartitionFieldSummary.class.getName())
+          .project(ManifestFile.schema())
+          .reuseContainers(false)
+          .build();
+
+      String schemaString = SchemaParser.toJson(schema());
+      String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+
+      return CloseableIterable.transform(manifests, manifest ->
+          new ManifestReadTask(ops.io(), new BaseFileScanTask(
+              DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter))));
+    }
+  }
+
+  private static class ManifestReadTask implements DataTask {
+    private final FileIO io;
+    private final FileScanTask manifestTask;
+
+    private ManifestReadTask(FileIO io, FileScanTask manifestTask) {
+      this.io = io;
+      this.manifestTask = manifestTask;
+    }
+
+    @Override
+    public CloseableIterable<StructLike> rows() {
+      return CloseableIterable.transform(
+          ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())),
+          file -> (GenericDataFile) file);
+    }
+
+    @Override
+    public DataFile file() {
+      return manifestTask.file();
+    }
+
+    @Override
+    public PartitionSpec spec() {
+      return manifestTask.spec();
+    }
+
+    @Override
+    public long start() {
+      return 0;
+    }
+
+    @Override
+    public long length() {
+      return manifestTask.length();
+    }
+
+    @Override
+    public Expression residual() {
+      return manifestTask.residual();
+    }
+
+    @Override
+    public Iterable<FileScanTask> split(long splitSize) {
+      return ImmutableList.of(this); // don't split
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
new file mode 100644
index 0000000..8ae9b31
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataTableScan extends BaseTableScan {
+  private static final Logger LOG = LoggerFactory.getLogger(DataTableScan.class);
+
+  private static final List<String> SCAN_COLUMNS = ImmutableList.of(
+      "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
+      "file_size_in_bytes", "record_count", "partition"
+  );
+  private static final List<String> SCAN_WITH_STATS_COLUMNS = ImmutableList.<String>builder()
+      .addAll(SCAN_COLUMNS)
+      .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds")
+      .build();
+  private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
+
+  public DataTableScan(TableOperations ops, Table table) {
+    super(ops, table, table.schema());
+  }
+
+  protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
+                          Expression rowFilter, boolean caseSensitive, boolean colStats,
+                          Collection<String> selectedColumns) {
+    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(
+      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+    return new DataTableScan(
+        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+  }
+
+  public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
+                                                   Expression rowFilter, boolean caseSensitive, boolean colStats) {
+    LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
+      PartitionSpec spec = ops.current().spec(specId);
+      return ManifestEvaluator.forRowFilter(rowFilter, spec, caseSensitive);
+    });
+
+    Iterable<ManifestFile> nonEmptyManifests = Iterables.filter(snapshot.manifests(),
+        manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
+    Iterable<ManifestFile> matchingManifests = Iterables.filter(nonEmptyManifests,
+        manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+    Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
+        matchingManifests,
+        manifest -> {
+          ManifestReader reader = ManifestReader
+              .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
+              .caseSensitive(caseSensitive);
+          PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
+          String schemaString = SchemaParser.toJson(spec.schema());
+          String specString = PartitionSpecParser.toJson(spec);
+          ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive);
+          return CloseableIterable.transform(
+              reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
+              file -> new BaseFileScanTask(file, schemaString, specString, residuals)
+          );
+        });
+
+    if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
+      return new ParallelIterable<>(readers, ThreadPools.getWorkerPool());
+    } else {
+      return CloseableIterable.concat(readers);
+    }
+  }
+
+  protected long targetSplitSize(TableOperations ops) {
+    return ops.current().propertyAsLong(
+        TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index f4ce193..73bb86f 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -35,6 +34,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
 import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
 
 public class FilteredManifest implements Filterable<FilteredManifest> {
   private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
@@ -84,13 +84,13 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
         caseSensitive);
   }
 
-  Iterable<ManifestEntry> allEntries() {
+  CloseableIterable<ManifestEntry> allEntries() {
     if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
         (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
-      return Iterables.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(columns),
           entry -> entry != null &&
               evaluator.eval(entry.file().partition()) &&
               metricsEvaluator.eval(entry.file()));
@@ -100,20 +100,20 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
     }
   }
 
-  Iterable<ManifestEntry> liveEntries() {
+  CloseableIterable<ManifestEntry> liveEntries() {
     if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
         (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
-      return Iterables.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(columns),
           entry -> entry != null &&
               entry.status() != Status.DELETED &&
               evaluator.eval(entry.file().partition()) &&
               metricsEvaluator.eval(entry.file()));
 
     } else {
-      return Iterables.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(columns),
           entry -> entry != null && entry.status() != Status.DELETED);
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java
new file mode 100644
index 0000000..d7070f7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+
+/**
+ * A {@link Table} implementation that exposes a table's history as rows.
+ * <p>
+ * History is based on the table's snapshot log, which logs each update to the table's current snapshot.
+ */
+class HistoryTable extends BaseMetadataTable {
+  private static final Schema HISTORY_SCHEMA = new Schema(
+      Types.NestedField.required(1, "made_current_at", Types.TimestampType.withZone()),
+      Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+      Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+      Types.NestedField.required(4, "is_current_ancestor", Types.BooleanType.get())
+  );
+
+  private final TableOperations ops;
+  private final Table table;
+
+  HistoryTable(TableOperations ops, Table table) {
+    this.ops = ops;
+    this.table = table;
+  }
+
+  @Override
+  Table table() {
+    return table;
+  }
+
+  @Override
+  String metadataTableName() {
+    return "history";
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new HistoryScan();
+  }
+
+  @Override
+  public String location() {
+    return ops.current().file().location();
+  }
+
+  @Override
+  public Schema schema() {
+    return HISTORY_SCHEMA;
+  }
+
+  private DataTask task(TableScan scan) {
+    return StaticDataTask.of(ops.current().file(), ops.current().snapshotLog(), convertHistoryEntryFunc(table));
+  }
+
+  private class HistoryScan extends StaticTableScan {
+    HistoryScan() {
+      super(ops, table, HISTORY_SCHEMA, HistoryTable.this::task);
+    }
+  }
+
+  private static Function<HistoryEntry, StaticDataTask.Row> convertHistoryEntryFunc(Table table) {
+    Map<Long, Snapshot> snapshots = Maps.newHashMap();
+    for (Snapshot snap : table.snapshots()) {
+      snapshots.put(snap.snapshotId(), snap);
+    }
+
+    Set<Long> ancestorIds = Sets.newHashSet(SnapshotUtil.currentAncestors(table));
+
+    return historyEntry -> {
+      long snapshotId = historyEntry.snapshotId();
+      Snapshot snap = snapshots.get(snapshotId);
+      return StaticDataTask.Row.of(
+          historyEntry.timestampMillis() * 1000,
+          historyEntry.snapshotId(),
+          snap != null ? snap.parentId() : null,
+          ancestorIds.contains(snapshotId)
+      );
+    };
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
new file mode 100644
index 0000000..848b0c3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+
+/**
+ * A {@link Table} implementation that exposes a table's manifest entries as rows.
+ * <p>
+ * WARNING: this table exposes internal details, like files that have been deleted. For a table of the live data files,
+ * use {@link DataFilesTable}.
+ */
+class ManifestEntriesTable extends BaseMetadataTable {
+  private final TableOperations ops;
+  private final Table table;
+
+  ManifestEntriesTable(TableOperations ops, Table table) {
+    this.ops = ops;
+    this.table = table;
+  }
+
+  @Override
+  Table table() {
+    return table;
+  }
+
+  @Override
+  String metadataTableName() {
+    return "entries";
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new EntriesTableScan(ops, table);
+  }
+
+  @Override
+  public Schema schema() {
+    return ManifestEntry.getSchema(table.spec().partitionType());
+  }
+
+  @Override
+  public String location() {
+    return table.currentSnapshot().manifestListLocation();
+  }
+
+  private static class EntriesTableScan extends BaseTableScan {
+    private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+    EntriesTableScan(TableOperations ops, Table table) {
+      super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+    }
+
+    private EntriesTableScan(
+        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    }
+
+    @Override
+    protected TableScan newRefinedScan(
+        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+      return new EntriesTableScan(
+          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    }
+
+    @Override
+    protected long targetSplitSize(TableOperations ops) {
+      return TARGET_SPLIT_SIZE;
+    }
+
+    @Override
+    protected CloseableIterable<FileScanTask> planFiles(
+        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+      CloseableIterable<ManifestFile> manifests = Avro
+          .read(ops.io().newInputFile(snapshot.manifestListLocation()))
+          .rename("manifest_file", GenericManifestFile.class.getName())
+          .rename("partitions", GenericPartitionFieldSummary.class.getName())
+          // 508 is the id used for the partition field, and r508 is the record name created for it in Avro schemas
+          .rename("r508", GenericPartitionFieldSummary.class.getName())
+          .project(ManifestFile.schema())
+          .reuseContainers(false)
+          .build();
+
+      String schemaString = SchemaParser.toJson(schema());
+      String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+
+      return CloseableIterable.transform(manifests, manifest -> new BaseFileScanTask(
+          DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 0d8e6c4..5381e8e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -31,7 +31,8 @@ import java.util.Set;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveManifestEvaluator;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 
@@ -42,52 +43,91 @@ class ManifestGroup {
   private final Set<ManifestFile> manifests;
   private final Expression dataFilter;
   private final Expression fileFilter;
+  private final Expression partitionFilter;
   private final boolean ignoreDeleted;
+  private final boolean ignoreExisting;
   private final List<String> columns;
+  private final boolean caseSensitive;
 
-  private final LoadingCache<Integer, InclusiveManifestEvaluator> evalCache;
+  private final LoadingCache<Integer, ManifestEvaluator> evalCache;
 
   ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
     this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
-        false, ImmutableList.of("*"));
+        Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
   }
 
   private ManifestGroup(TableOperations ops, Set<ManifestFile> manifests,
-                        Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
-                        List<String> columns) {
+                        Expression dataFilter, Expression fileFilter, Expression partitionFilter,
+                        boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
+                        boolean caseSensitive) {
     this.ops = ops;
     this.manifests = manifests;
     this.dataFilter = dataFilter;
     this.fileFilter = fileFilter;
+    this.partitionFilter = partitionFilter;
     this.ignoreDeleted = ignoreDeleted;
+    this.ignoreExisting = ignoreExisting;
     this.columns = columns;
+    this.caseSensitive = caseSensitive;
     this.evalCache = Caffeine.newBuilder().build(specId -> {
       PartitionSpec spec = ops.current().spec(specId);
-      return new InclusiveManifestEvaluator(spec, dataFilter);
+      return ManifestEvaluator.forPartitionFilter(
+          Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
+          spec, caseSensitive);
     });
   }
 
+  public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
+    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+        ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
+  }
+
   public ManifestGroup filterData(Expression expr) {
     return new ManifestGroup(
-        ops, manifests, Expressions.and(dataFilter, expr), fileFilter, ignoreDeleted, columns);
+        ops, manifests, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
+        ignoreDeleted, ignoreExisting, columns, caseSensitive);
   }
 
   public ManifestGroup filterFiles(Expression expr) {
     return new ManifestGroup(
-        ops, manifests, dataFilter, Expressions.and(fileFilter, expr), ignoreDeleted, columns);
+        ops, manifests, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
+        ignoreDeleted, ignoreExisting, columns, caseSensitive);
+  }
+
+  public ManifestGroup filterPartitions(Expression expr) {
+    return new ManifestGroup(
+        ops, manifests, dataFilter, fileFilter, Expressions.and(fileFilter, expr),
+        ignoreDeleted, ignoreExisting, columns, caseSensitive);
   }
 
   public ManifestGroup ignoreDeleted() {
-    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, true, columns);
+    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter, true,
+        ignoreExisting, columns, caseSensitive);
+  }
+
+  public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
+    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+        shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
+  }
+
+  public ManifestGroup ignoreExisting() {
+    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+        ignoreDeleted, true, columns, caseSensitive);
   }
 
-  public ManifestGroup select(List<String> selectedColumns) {
+  public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
+    return new ManifestGroup(ops, manifests, dataFilter, fileFilter, partitionFilter,
+        ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
+  }
+
+  public ManifestGroup select(List<String> columnNames) {
     return new ManifestGroup(
-        ops, manifests, dataFilter, fileFilter, ignoreDeleted, Lists.newArrayList(selectedColumns));
+        ops, manifests, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
+        Lists.newArrayList(columnNames), caseSensitive);
   }
 
-  public ManifestGroup select(String... selectedColumns) {
-    return select(Arrays.asList(selectedColumns));
+  public ManifestGroup select(String... columnNames) {
+    return select(Arrays.asList(columnNames));
   }
 
   /**
@@ -105,11 +145,19 @@ class ManifestGroup {
         manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
 
     if (ignoreDeleted) {
+      // only scan manifests that have entries other than deletes
       // remove any manifests that don't have any existing or added files. if either the added or
       // existing files count is missing, the manifest must be scanned.
-      matchingManifests = Iterables.filter(manifests, manifest ->
-          manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
-              manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+      matchingManifests = Iterables.filter(manifests,
+          manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
+    }
+
+    if (ignoreExisting) {
+      // only scan manifests that have entries other than existing
+      // remove any manifests that don't have any deleted or added files. if either the added or
+      // deleted files count is missing, the manifest must be scanned.
+      matchingManifests = Iterables.filter(manifests,
+          manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles());
     }
 
     Iterable<CloseableIterable<ManifestEntry>> readers = Iterables.transform(
@@ -118,12 +166,28 @@ class ManifestGroup {
           ManifestReader reader = ManifestReader.read(
               ops.io().newInputFile(manifest.path()),
               ops.current()::spec);
-          FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
-          return CloseableIterable.combine(
-              Iterables.filter(
-                  ignoreDeleted ? filtered.liveEntries() : filtered.allEntries(),
-                  entry -> evaluator.eval((GenericDataFile) entry.file())),
-              reader);
+
+          FilteredManifest filtered = reader
+              .filterRows(dataFilter)
+              .filterPartitions(partitionFilter)
+              .select(columns);
+
+          CloseableIterable<ManifestEntry> entries = filtered.allEntries();
+          if (ignoreDeleted) {
+            entries = filtered.liveEntries();
+          }
+
+          if (ignoreExisting) {
+            entries = CloseableIterable.filter(entries,
+                entry -> entry.status() != ManifestEntry.Status.EXISTING);
+          }
+
+          if (fileFilter != null && fileFilter != Expressions.alwaysTrue()) {
+            entries = CloseableIterable.filter(entries,
+                entry -> evaluator.eval((GenericDataFile) entry.file()));
+          }
+
+          return entries;
         });
 
     return CloseableIterable.concat(readers);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
new file mode 100644
index 0000000..eb2875c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * A {@link Table} implementation that exposes a table's manifest files as rows.
+ */
+class ManifestsTable extends BaseMetadataTable {
+  private static final Schema SNAPSHOT_SCHEMA = new Schema(
+      Types.NestedField.required(1, "path", Types.StringType.get()),
+      Types.NestedField.required(2, "length", Types.LongType.get()),
+      Types.NestedField.required(3, "partition_spec_id", Types.IntegerType.get()),
+      Types.NestedField.required(4, "added_snapshot_id", Types.LongType.get()),
+      Types.NestedField.required(5, "added_data_files_count", Types.IntegerType.get()),
+      Types.NestedField.required(6, "existing_data_files_count", Types.IntegerType.get()),
+      Types.NestedField.required(7, "deleted_data_files_count", Types.IntegerType.get()),
+      Types.NestedField.required(8, "partition_summaries", Types.ListType.ofRequired(9, Types.StructType.of(
+          Types.NestedField.required(10, "contains_null", Types.BooleanType.get()),
+          Types.NestedField.optional(11, "lower_bound", Types.StringType.get()),
+          Types.NestedField.optional(12, "upper_bound", Types.StringType.get())
+      )))
+  );
+
+  private final TableOperations ops;
+  private final Table table;
+  private final PartitionSpec spec;
+
+  ManifestsTable(TableOperations ops, Table table) {
+    this.ops = ops;
+    this.table = table;
+    this.spec = table.spec();
+  }
+
+  @Override
+  Table table() {
+    return table;
+  }
+
+  @Override
+  String metadataTableName() {
+    return "manifests";
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new SnapshotsTableScan();
+  }
+
+  @Override
+  public String location() {
+    return ops.current().file().location();
+  }
+
+  @Override
+  public Schema schema() {
+    return SNAPSHOT_SCHEMA;
+  }
+
+  protected DataTask task(TableScan scan) {
+    return StaticDataTask.of(
+        ops.io().newInputFile(scan.snapshot().manifestListLocation()),
+        scan.snapshot().manifests(),
+        this::manifestFileToRow);
+  }
+
+  private class SnapshotsTableScan extends StaticTableScan {
+    SnapshotsTableScan() {
+      super(ops, table, SNAPSHOT_SCHEMA, ManifestsTable.this::task);
+    }
+  }
+
+  private StaticDataTask.Row manifestFileToRow(ManifestFile manifest) {
+    return StaticDataTask.Row.of(
+        manifest.path(),
+        manifest.length(),
+        manifest.partitionSpecId(),
+        manifest.snapshotId(),
+        manifest.addedFilesCount(),
+        manifest.existingFilesCount(),
+        manifest.deletedFilesCount(),
+        partitionSummariesToRows(manifest.partitions())
+    );
+  }
+
+  private List<StaticDataTask.Row> partitionSummariesToRows(List<ManifestFile.PartitionFieldSummary> summaries) {
+    List<StaticDataTask.Row> rows = Lists.newArrayList();
+
+    for (int i = 0; i < spec.fields().size(); i += 1) {
+      ManifestFile.PartitionFieldSummary summary = summaries.get(i);
+      rows.add(StaticDataTask.Row.of(
+          summary.containsNull(),
+          spec.fields().get(i).transform().toHumanString(
+              Conversions.fromByteBuffer(spec.partitionType().fields().get(i).type(), summary.lowerBound())),
+          spec.fields().get(i).transform().toHumanString(
+              Conversions.fromByteBuffer(spec.partitionType().fields().get(i).type(), summary.upperBound()))
+      ));
+    }
+
+    return rows;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
new file mode 100644
index 0000000..0ac4377
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.types.Types;
+
+/**
+ * A {@link Table} implementation that exposes a table's known snapshots as rows.
+ * <p>
+ * This does not include snapshots that have been expired using {@link ExpireSnapshots}.
+ */
+class SnapshotsTable extends BaseMetadataTable {
+  private static final Schema SNAPSHOT_SCHEMA = new Schema(
+      Types.NestedField.required(1, "committed_at", Types.TimestampType.withZone()),
+      Types.NestedField.required(2, "snapshot_id", Types.LongType.get()),
+      Types.NestedField.optional(3, "parent_id", Types.LongType.get()),
+      Types.NestedField.optional(4, "operation", Types.StringType.get()),
+      Types.NestedField.optional(5, "manifest_list", Types.StringType.get()),
+      Types.NestedField.optional(6, "summary",
+          Types.MapType.ofRequired(7, 8, Types.StringType.get(), Types.StringType.get()))
+  );
+
+  private final TableOperations ops;
+  private final Table table;
+
+  SnapshotsTable(TableOperations ops, Table table) {
+    this.ops = ops;
+    this.table = table;
+  }
+
+  @Override
+  Table table() {
+    return table;
+  }
+
+  @Override
+  String metadataTableName() {
+    return "snapshots";
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new SnapshotsTableScan();
+  }
+
+  @Override
+  public String location() {
+    return ops.current().file().location();
+  }
+
+  @Override
+  public Schema schema() {
+    return SNAPSHOT_SCHEMA;
+  }
+
+  private DataTask task(BaseTableScan scan) {
+    return StaticDataTask.of(ops.current().file(), ops.current().snapshots(), SnapshotsTable::snapshotToRow);
+  }
+
+  private class SnapshotsTableScan extends StaticTableScan {
+    SnapshotsTableScan() {
+      super(ops, table, SNAPSHOT_SCHEMA, SnapshotsTable.this::task);
+    }
+  }
+
+  private static StaticDataTask.Row snapshotToRow(Snapshot snap) {
+    return StaticDataTask.Row.of(
+        snap.timestampMillis() * 1000,
+        snap.snapshotId(),
+        snap.parentId(),
+        snap.operation(),
+        snap.manifestListLocation(),
+        snap.summary()
+    );
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
new file mode 100644
index 0000000..9daecd2
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+
+class StaticDataTask implements DataTask {
+
+  static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
+    return new StaticDataTask(metadata,
+        Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]));
+  }
+
+  private final DataFile metadataFile;
+  private final StructLike[] rows;
+
+  private StaticDataTask(InputFile metadata, StructLike[] rows) {
+    this.metadataFile = DataFiles.builder()
+        .withInputFile(metadata)
+        .withRecordCount(rows.length)
+        .withFormat(FileFormat.METADATA)
+        .build();
+    this.rows = rows;
+  }
+
+  @Override
+  public CloseableIterable<StructLike> rows() {
+    return CloseableIterable.withNoopClose(Arrays.asList(rows));
+  }
+
+  @Override
+  public DataFile file() {
+    return metadataFile;
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return PartitionSpec.unpartitioned();
+  }
+
+  @Override
+  public long start() {
+    return 0;
+  }
+
+  @Override
+  public long length() {
+    return metadataFile.fileSizeInBytes();
+  }
+
+  @Override
+  public Expression residual() {
+    return Expressions.alwaysTrue();
+  }
+
+  @Override
+  public Iterable<FileScanTask> split(long splitSize) {
+    return ImmutableList.of(this);
+  }
+
+  /**
+   * Implements {@link StructLike#get} for passing static rows.
+   */
+  static class Row implements StructLike, Serializable {
+    public static Row of(Object... values) {
+      return new Row(values);
+    }
+
+    private final Object[] values;
+
+    private Row(Object... values) {
+      this.values = values;
+    }
+
+    @Override
+    public int size() {
+      return values.length;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(values[pos]);
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("Setting values is not supported");
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
new file mode 100644
index 0000000..c29c14b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.function.Function;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+class StaticTableScan extends BaseTableScan {
+  private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+
+  private final Function<StaticTableScan, DataTask> buildTask;
+
+  StaticTableScan(TableOperations ops, Table table, Schema schema, Function<StaticTableScan, DataTask> buildTask) {
+    super(ops, table, schema);
+    this.buildTask = buildTask;
+  }
+
+  private StaticTableScan(
+      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      Function<StaticTableScan, DataTask> buildTask) {
+    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+    this.buildTask = buildTask;
+  }
+
+  @Override
+  protected long targetSplitSize(TableOperations ops) {
+    return TARGET_SPLIT_SIZE;
+  }
+
+  @Override
+  protected TableScan newRefinedScan(
+      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+    return new StaticTableScan(
+        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> planFiles(
+      TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+    return CloseableIterable.withNoopClose(buildTask.apply(this));
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 342fb5a..ef289b4 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -81,7 +81,7 @@ public class TableMetadata {
         ImmutableMap.copyOf(properties), -1, ImmutableList.of(), ImmutableList.of());
   }
 
-  public static class SnapshotLogEntry {
+  public static class SnapshotLogEntry implements HistoryEntry {
     private final long timestampMillis;
     private final long snapshotId;
 
@@ -90,10 +90,12 @@ public class TableMetadata {
       this.snapshotId = snapshotId;
     }
 
+    @Override
     public long timestampMillis() {
       return timestampMillis;
     }
 
+    @Override
     public long snapshotId() {
       return snapshotId;
     }
@@ -140,7 +142,7 @@ public class TableMetadata {
   private final List<Snapshot> snapshots;
   private final Map<Long, Snapshot> snapshotsById;
   private final Map<Integer, PartitionSpec> specsById;
-  private final List<SnapshotLogEntry> snapshotLog;
+  private final List<HistoryEntry> snapshotLog;
 
   TableMetadata(TableOperations ops,
                 InputFile file,
@@ -154,7 +156,7 @@ public class TableMetadata {
                 Map<String, String> properties,
                 long currentSnapshotId,
                 List<Snapshot> snapshots,
-                List<SnapshotLogEntry> snapshotLog) {
+                List<HistoryEntry> snapshotLog) {
     this.ops = ops;
     this.file = file;
     this.uuid = uuid;
@@ -172,8 +174,8 @@ public class TableMetadata {
     this.snapshotsById = indexSnapshots(snapshots);
     this.specsById = indexSpecs(specs);
 
-    SnapshotLogEntry last = null;
-    for (SnapshotLogEntry logEntry : snapshotLog) {
+    HistoryEntry last = null;
+    for (HistoryEntry logEntry : snapshotLog) {
       if (last != null) {
         Preconditions.checkArgument(
             (logEntry.timestampMillis() - last.timestampMillis()) >= 0,
@@ -259,7 +261,7 @@ public class TableMetadata {
     return snapshots;
   }
 
-  public List<SnapshotLogEntry> snapshotLog() {
+  public List<HistoryEntry> snapshotLog() {
     return snapshotLog;
   }
 
@@ -324,7 +326,7 @@ public class TableMetadata {
         .addAll(snapshots)
         .add(snapshot)
         .build();
-    List<SnapshotLogEntry> newSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
         .addAll(snapshotLog)
         .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
         .build();
@@ -344,8 +346,8 @@ public class TableMetadata {
 
     // update the snapshot log
     Set<Long> validIds = Sets.newHashSet(Iterables.transform(filtered, Snapshot::snapshotId));
-    List<SnapshotLogEntry> newSnapshotLog = Lists.newArrayList();
-    for (SnapshotLogEntry logEntry : snapshotLog) {
+    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+    for (HistoryEntry logEntry : snapshotLog) {
       if (validIds.contains(logEntry.snapshotId())) {
         // copy the log entries that are still valid
         newSnapshotLog.add(logEntry);
@@ -369,7 +371,7 @@ public class TableMetadata {
         "Cannot set current snapshot to unknown: %s", snapshot.snapshotId());
 
     long nowMillis = System.currentTimeMillis();
-    List<SnapshotLogEntry> newSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
         .addAll(snapshotLog)
         .add(new SnapshotLogEntry(nowMillis, snapshot.snapshotId()))
         .build();
@@ -387,8 +389,8 @@ public class TableMetadata {
   }
 
   public TableMetadata removeSnapshotLogEntries(Set<Long> snapshotIds) {
-    List<SnapshotLogEntry> newSnapshotLog = Lists.newArrayList();
-    for (SnapshotLogEntry logEntry : snapshotLog) {
+    List<HistoryEntry> newSnapshotLog = Lists.newArrayList();
+    for (HistoryEntry logEntry : snapshotLog) {
       if (!snapshotIds.contains(logEntry.snapshotId())) {
         // copy the log entries that are still valid
         newSnapshotLog.add(logEntry);
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 2df52a0..99bc1d7 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -168,7 +168,7 @@ public class TableMetadataParser {
     generator.writeEndArray();
 
     generator.writeArrayFieldStart(SNAPSHOT_LOG);
-    for (SnapshotLogEntry logEntry : metadata.snapshotLog()) {
+    for (HistoryEntry logEntry : metadata.snapshotLog()) {
       generator.writeStartObject();
       generator.writeNumberField(TIMESTAMP_MS, logEntry.timestampMillis());
       generator.writeNumberField(SNAPSHOT_ID, logEntry.snapshotId());
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
new file mode 100644
index 0000000..ac38a86
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+public class SnapshotUtil {
+  private SnapshotUtil() {
+  }
+
+  /**
+   * Return the snapshot IDs for the ancestors of the current table state.
+   * <p>
+   * Ancestor IDs are ordered by commit time, descending. The first ID is the current snapshot, followed by its parent,
+   * and so on.
+   *
+   * @param table a {@link Table}
+   * @return a set of snapshot IDs of the known ancestor snapshots, including the current ID
+   */
+  public static List<Long> currentAncestors(Table table) {
+    return ancestorIds(table.currentSnapshot(), table::snapshot);
+  }
+
+  public static List<Long> ancestorIds(Snapshot snapshot, Function<Long, Snapshot> lookup) {
+    List<Long> ancestorIds = Lists.newArrayList();
+    Snapshot current = snapshot;
+    while (current != null) {
+      ancestorIds.add(current.snapshotId());
+      if (current.parentId() != null) {
+        current = lookup.apply(current.parentId());
+      } else {
+        current = null;
+      }
+    }
+    return ancestorIds;
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestBaseTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
similarity index 98%
rename from core/src/test/java/org/apache/iceberg/TestBaseTableScan.java
rename to core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index dd90467..5c3dc90 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -31,7 +31,7 @@ import org.junit.rules.TemporaryFolder;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.junit.Assert.assertEquals;
 
-public class TestBaseTableScan {
+public class TestDataTableScan {
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
diff --git a/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java
new file mode 100644
index 0000000..7db931c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestEntriesMetadataTable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestEntriesMetadataTable extends TableTestBase {
+
+  @Test
+  public void testEntriesTable() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table entriesTable = new ManifestEntriesTable(table.ops(), table);
+
+    Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType());
+
+    assertEquals("A tableScan.select() should prune the schema",
+        expectedSchema.asStruct(),
+        entriesTable.schema().asStruct());
+  }
+
+  @Test
+  public void testEntriesTableScan() {
+    table.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table entriesTable = new ManifestEntriesTable(table.ops(), table);
+    TableScan scan = entriesTable.newScan();
+
+    Schema expectedSchema = ManifestEntry.getSchema(table.spec().partitionType());
+
+    assertEquals("A tableScan.select() should prune the schema",
+        expectedSchema.asStruct(),
+        scan.schema().asStruct());
+
+    FileScanTask file = Iterables.getOnlyElement(scan.planFiles());
+    Assert.assertEquals("Data file should be the table's manifest",
+        Iterables.getOnlyElement(table.currentSnapshot().manifests()).path(), file.file().path());
+    Assert.assertEquals("Should contain 2 data file records", 2, file.file().recordCount());
+  }
+
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
index 1823122..d58bfe6 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadataJson.java
@@ -76,7 +76,7 @@ public class TestTableMetadataJson {
         ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
-    List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
+    List<HistoryEntry> snapshotLog = ImmutableList.<HistoryEntry>builder()
         .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
         .add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
         .build();
@@ -140,7 +140,7 @@ public class TestTableMetadataJson {
         ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of(
         new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
 
-    List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
+    List<HistoryEntry> reversedSnapshotLog = Lists.newArrayList();
 
     TableMetadata expected = new TableMetadata(ops, null, UUID.randomUUID().toString(), "s3://bucket/test/location",
         System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 837c589..b55521c 100644
--- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -51,7 +51,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable {
 
   @Override
   public org.apache.iceberg.Table loadTable(TableIdentifier identifier) {
-    Preconditions.checkArgument(identifier.namespace().levels().length == 1,
+    Preconditions.checkArgument(identifier.namespace().levels().length >= 1,
         "Missing database in table identifier: %s", identifier);
     return super.loadTable(identifier);
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 943db87..e0b2851 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.function.Function;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -415,25 +416,31 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     }
 
     private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
-      InputFile location = inputFiles.get(task.file().path().toString());
-      Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
       CloseableIterable<InternalRow> iter;
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(location, task, readSchema);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(location, task, readSchema);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(location, task, readSchema);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
+      if (task.isDataTask()) {
+        iter = newDataIterable(task.asDataTask(), readSchema);
+
+      } else {
+        InputFile location = inputFiles.get(task.file().path().toString());
+        Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+
+        switch (task.file().format()) {
+          case PARQUET:
+            iter = newParquetIterable(location, task, readSchema);
+            break;
+
+          case AVRO:
+            iter = newAvroIterable(location, task, readSchema);
+            break;
+
+          case ORC:
+            iter = newOrcIterable(location, task, readSchema);
+            break;
+
+          default:
+            throw new UnsupportedOperationException(
+                "Cannot read unknown format: " + task.file().format());
+        }
       }
 
       this.currentCloseable = iter;
@@ -496,6 +503,13 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
           .caseSensitive(caseSensitive)
           .build();
     }
+
+    private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
+      StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+      Iterable<InternalRow> asSparkRows = Iterables.transform(task.asDataTask().rows(), row::setStruct);
+      return CloseableIterable.withNoopClose(
+          Iterables.transform(asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke));
+    }
   }
 
   private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
new file mode 100644
index 0000000..1ed5a78
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class StructInternalRow extends InternalRow {
+  private final Types.StructType type;
+  private StructLike struct;
+
+  StructInternalRow(Types.StructType type) {
+    this.type = type;
+  }
+
+  private StructInternalRow(Types.StructType type, StructLike struct) {
+    this.type = type;
+    this.struct = struct;
+  }
+
+  public StructInternalRow setStruct(StructLike newStruct) {
+    this.struct = newStruct;
+    return this;
+  }
+
+  @Override
+  public int numFields() {
+    return struct.size();
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    throw new UnsupportedOperationException("StructInternalRow is read-only");
+  }
+
+  @Override
+  public void update(int i, Object value) {
+    throw new UnsupportedOperationException("StructInternalRow is read-only");
+  }
+
+  @Override
+  public InternalRow copy() {
+    return this;
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) {
+    return struct.get(ordinal, Object.class) == null;
+  }
+
+  @Override
+  public boolean getBoolean(int ordinal) {
+    return struct.get(ordinal, Boolean.class);
+  }
+
+  @Override
+  public byte getByte(int ordinal) {
+    return (byte) (int) struct.get(ordinal, Integer.class);
+  }
+
+  @Override
+  public short getShort(int ordinal) {
+    return (short) (int) struct.get(ordinal, Integer.class);
+  }
+
+  @Override
+  public int getInt(int ordinal) {
+    return struct.get(ordinal, Integer.class);
+  }
+
+  @Override
+  public long getLong(int ordinal) {
+    return struct.get(ordinal, Long.class);
+  }
+
+  @Override
+  public float getFloat(int ordinal) {
+    return struct.get(ordinal, Float.class);
+  }
+
+  @Override
+  public double getDouble(int ordinal) {
+    return struct.get(ordinal, Double.class);
+  }
+
+  @Override
+  public Decimal getDecimal(int ordinal, int precision, int scale) {
+    return Decimal.apply(struct.get(ordinal, BigDecimal.class));
+  }
+
+  @Override
+  public UTF8String getUTF8String(int ordinal) {
+    CharSequence seq = struct.get(ordinal, CharSequence.class);
+    return UTF8String.fromString(seq.toString());
+  }
+
+  @Override
+  public byte[] getBinary(int ordinal) {
+    ByteBuffer bytes = struct.get(ordinal, ByteBuffer.class);
+    return ByteBuffers.toByteArray(bytes);
+  }
+
+  @Override
+  public CalendarInterval getInterval(int ordinal) {
+    throw new UnsupportedOperationException("Unsupported type: interval");
+  }
+
+  @Override
+  public InternalRow getStruct(int ordinal, int numFields) {
+    return new StructInternalRow(
+        type.fields().get(ordinal).type().asStructType(),
+        struct.get(ordinal, StructLike.class));
+  }
+
+  @Override
+  public ArrayData getArray(int ordinal) {
+    return collectionToArrayData(
+        type.fields().get(ordinal).type().asListType().elementType(),
+        struct.get(ordinal, Collection.class));
+  }
+
+  @Override
+  public MapData getMap(int ordinal) {
+    return mapToMapData(type.fields().get(ordinal).type().asMapType(), struct.get(ordinal, Map.class));
+  }
+
+  @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public Object get(int ordinal, DataType dataType) {
+    if (dataType instanceof IntegerType) {
+      return getInt(ordinal);
+    } else if (dataType instanceof LongType) {
+      return getLong(ordinal);
+    } else if (dataType instanceof StringType) {
+      return getUTF8String(ordinal);
+    } else if (dataType instanceof FloatType) {
+      return getFloat(ordinal);
+    } else if (dataType instanceof DoubleType) {
+      return getDouble(ordinal);
+    } else if (dataType instanceof DecimalType) {
+      DecimalType decimalType = (DecimalType) dataType;
+      return getDecimal(ordinal, decimalType.precision(), decimalType.scale());
+    } else if (dataType instanceof BinaryType) {
+      return getBinary(ordinal);
+    } else if (dataType instanceof StructType) {
+      return getStruct(ordinal, ((StructType) dataType).size());
+    } else if (dataType instanceof ArrayType) {
+      return getArray(ordinal);
+    } else if (dataType instanceof MapType) {
+      return getMap(ordinal);
+    } else if (dataType instanceof BooleanType) {
+      return getBoolean(ordinal);
+    } else if (dataType instanceof ByteType) {
+      return getByte(ordinal);
+    } else if (dataType instanceof ShortType) {
+      return getShort(ordinal);
+    }
+    return null;
+  }
+
+  private MapData mapToMapData(Types.MapType mapType, Map<?, ?> map) {
+    // make a defensive copy to ensure entries do not change
+    List<Map.Entry<?, ?>> entries = ImmutableList.copyOf(map.entrySet());
+    return new ArrayBasedMapData(
+        collectionToArrayData(mapType.keyType(), Lists.transform(entries, Map.Entry::getKey)),
+        collectionToArrayData(mapType.valueType(), Lists.transform(entries, Map.Entry::getValue)));
+  }
+
+  private ArrayData collectionToArrayData(Type elementType, Collection<?> values) {
+    switch (elementType.typeId()) {
+      case BOOLEAN:
+        return fillArray(values, array -> (BiConsumer<Integer, Boolean>) array::setBoolean);
+      case INTEGER:
+      case DATE:
+      case TIME:
+        return fillArray(values, array -> (BiConsumer<Integer, Integer>) array::setInt);
+      case LONG:
+      case TIMESTAMP:
+        return fillArray(values, array -> (BiConsumer<Integer, Long>) array::setLong);
+      case FLOAT:
+        return fillArray(values, array -> (BiConsumer<Integer, Float>) array::setFloat);
+      case DOUBLE:
+        return fillArray(values, array -> (BiConsumer<Integer, Double>) array::setDouble);
+      case STRING:
+        return fillArray(values, array ->
+            (BiConsumer<Integer, CharSequence>) (pos, seq) -> array.update(pos, UTF8String.fromString(seq.toString())));
+      case FIXED:
+      case BINARY:
+        return fillArray(values, array ->
+            (BiConsumer<Integer, ByteBuffer>) (pos, buf) -> array.update(pos, ByteBuffers.toByteArray(buf)));
+      case DECIMAL:
+        return fillArray(values, array ->
+            (BiConsumer<Integer, BigDecimal>) (pos, dec) -> array.update(pos, Decimal.apply(dec)));
+      case STRUCT:
+        return fillArray(values, array -> (BiConsumer<Integer, StructLike>) (pos, tuple) ->
+            array.update(pos, new StructInternalRow(elementType.asStructType(), tuple)));
+      case LIST:
+        return fillArray(values, array -> (BiConsumer<Integer, Collection<?>>) (pos, list) ->
+            array.update(pos, collectionToArrayData(elementType.asListType(), list)));
+      case MAP:
+        return fillArray(values, array -> (BiConsumer<Integer, Map<?, ?>>) (pos, map) ->
+            array.update(pos, mapToMapData(elementType.asMapType(), map)));
+      default:
+        throw new UnsupportedOperationException("Unsupported array element type: " + elementType);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> GenericArrayData fillArray(Collection<?> values, Function<ArrayData, BiConsumer<Integer, T>> makeSetter) {
+    GenericArrayData array = new GenericArrayData(new Object[values.size()]);
+    BiConsumer<Integer, T> setter = makeSetter.apply(array);
+
+    int index = 0;
+    for (Object value : values) {
+      if (value == null) {
+        array.setNullAt(index);
+      } else {
+        setter.accept(index, (T) value);
+      }
+
+      index += 1;
+    }
+
+    return array;
+  }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index fd901c3..c20bc67 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -144,7 +144,7 @@ public class TestHelpers {
         break;
       case STRING:
         Assert.assertTrue("Should be a String", actual instanceof String);
-        Assert.assertEquals("Strings should be equal", expected, actual);
+        Assert.assertEquals("Strings should be equal", String.valueOf(expected), actual);
         break;
       case UUID:
         Assert.assertTrue("Should expect a UUID", expected instanceof UUID);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 72fd1b2..5a50731 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -19,17 +19,30 @@
 
 package org.apache.iceberg.spark.source;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
@@ -114,4 +127,296 @@ public class TestIcebergSourceHiveTables {
       metastoreClient.dropTable(DB_NAME, TABLE_NAME);
     }
   }
+
+  @Test
+  public void testHiveEntriesTable() throws TException, IOException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+      Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+
+      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+
+      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+      inputDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".entries")
+          .collectAsList();
+
+      Assert.assertEquals("Should only contain one manifest", 1, table.currentSnapshot().manifests().size());
+      InputFile manifest = table.io().newInputFile(table.currentSnapshot().manifests().get(0).path());
+      List<GenericData.Record> expected;
+      try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
+        expected = Lists.newArrayList(rows);
+      }
+
+      Assert.assertEquals("Entries table should have one row", 1, expected.size());
+      Assert.assertEquals("Actual results should have one row", 1, actual.size());
+      TestHelpers.assertEqualsSafe(entriesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
+
+  @Test
+  public void testHiveFilesTable() throws TException, IOException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA,
+          PartitionSpec.builderFor(SCHEMA).identity("id").build());
+      Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+      Table filesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "files"));
+
+      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
+
+      df1.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      // add a second file
+      df2.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      // delete the first file to test that only live files are listed
+      table.newDelete().deleteFromRowFilter(Expressions.equal("id", 1)).commit();
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".files")
+          .collectAsList();
+
+      List<GenericData.Record> expected = Lists.newArrayList();
+      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+        InputFile in = table.io().newInputFile(manifest.path());
+        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
+          for (GenericData.Record record : rows) {
+            if ((Integer) record.get("status") < 2 /* added or existing */) {
+              expected.add((GenericData.Record) record.get("data_file"));
+            }
+          }
+        }
+      }
+
+      Assert.assertEquals("Files table should have one row", 1, expected.size());
+      Assert.assertEquals("Actual results should have one row", 1, actual.size());
+      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
+
+  @Test
+  public void testHiveHistoryTable() throws TException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+      Table historyTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "history"));
+
+      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+
+      inputDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+      long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+      long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+      inputDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+      long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+      long secondSnapshotId = table.currentSnapshot().snapshotId();
+
+      // rollback the table state to the first snapshot
+      table.rollback().toSnapshotId(firstSnapshotId).commit();
+      long rollbackTimestamp = Iterables.getLast(table.history()).timestampMillis();
+
+      inputDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+      long thirdSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+      long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".history")
+          .collectAsList();
+
+      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(historyTable.schema(), "history"));
+      List<GenericData.Record> expected = Lists.newArrayList(
+          builder.set("made_current_at", firstSnapshotTimestamp * 1000)
+              .set("snapshot_id", firstSnapshotId)
+              .set("parent_id", null)
+              .set("is_current_ancestor", true)
+              .build(),
+          builder.set("made_current_at", secondSnapshotTimestamp * 1000)
+              .set("snapshot_id", secondSnapshotId)
+              .set("parent_id", firstSnapshotId)
+              .set("is_current_ancestor", false) // commit rolled back, not an ancestor of the current table state
+              .build(),
+          builder.set("made_current_at", rollbackTimestamp * 1000)
+              .set("snapshot_id", firstSnapshotId)
+              .set("parent_id", null)
+              .set("is_current_ancestor", true)
+              .build(),
+          builder.set("made_current_at", thirdSnapshotTimestamp * 1000)
+              .set("snapshot_id", thirdSnapshotId)
+              .set("parent_id", firstSnapshotId)
+              .set("is_current_ancestor", true)
+              .build()
+      );
+
+      Assert.assertEquals("History table should have a row for each commit", 4, actual.size());
+      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(1), actual.get(1));
+      TestHelpers.assertEqualsSafe(historyTable.schema().asStruct(), expected.get(2), actual.get(2));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
+
+  @Test
+  public void testHiveSnapshotsTable() throws TException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());
+      Table snapTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "snapshots"));
+
+      List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+      Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+
+      inputDf.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+      long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+      long firstSnapshotId = table.currentSnapshot().snapshotId();
+      String firstManifestList = table.currentSnapshot().manifestListLocation();
+
+      table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+      long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+      long secondSnapshotId = table.currentSnapshot().snapshotId();
+      String secondManifestList = table.currentSnapshot().manifestListLocation();
+
+      // rollback the table state to the first snapshot
+      table.rollback().toSnapshotId(firstSnapshotId).commit();
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".snapshots")
+          .collectAsList();
+
+      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(snapTable.schema(), "snapshots"));
+      List<GenericData.Record> expected = Lists.newArrayList(
+          builder.set("committed_at", firstSnapshotTimestamp * 1000)
+              .set("snapshot_id", firstSnapshotId)
+              .set("parent_id", null)
+              .set("operation", "append")
+              .set("manifest_list", firstManifestList)
+              .set("summary", ImmutableMap.of(
+                  "added-records", "1",
+                  "added-data-files", "1",
+                  "changed-partition-count", "1",
+                  "total-data-files", "1",
+                  "total-records", "1"
+              ))
+              .build(),
+          builder.set("committed_at", secondSnapshotTimestamp * 1000)
+              .set("snapshot_id", secondSnapshotId)
+              .set("parent_id", firstSnapshotId)
+              .set("operation", "delete")
+              .set("manifest_list", secondManifestList)
+              .set("summary", ImmutableMap.of(
+                  "deleted-records", "1",
+                  "deleted-data-files", "1",
+                  "changed-partition-count", "1",
+                  "total-records", "0",
+                  "total-data-files", "0"
+              ))
+              .build()
+      );
+
+      Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
+      TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(0), actual.get(0));
+      TestHelpers.assertEqualsSafe(snapTable.schema().asStruct(), expected.get(1), actual.get(1));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
+
+  @Test
+  public void testHiveManifestsTable() throws TException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(
+          TABLE_IDENTIFIER,
+          SCHEMA,
+          PartitionSpec.builderFor(SCHEMA).identity("id").build());
+      Table manifestTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "manifests"));
+
+      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+
+      df1.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".manifests")
+          .collectAsList();
+
+      table.refresh();
+
+      GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(
+          manifestTable.schema(), "manifests"));
+      GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
+          manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
+      List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().manifests(), manifest ->
+          builder.set("path", manifest.path())
+              .set("length", manifest.length())
+              .set("partition_spec_id", manifest.partitionSpecId())
+              .set("added_snapshot_id", manifest.snapshotId())
+              .set("added_data_files_count", manifest.addedFilesCount())
+              .set("existing_data_files_count", manifest.existingFilesCount())
+              .set("deleted_data_files_count", manifest.deletedFilesCount())
+              .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
+                  summaryBuilder
+                      .set("contains_null", false)
+                      .set("lower_bound", "1")
+                      .set("upper_bound", "1")
+                      .build()
+                  ))
+              .build()
+      );
+
+      Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
+      TestHelpers.assertEqualsSafe(manifestTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
 }