You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/05 16:17:46 UTC

[iceberg] branch master updated: Add ignoreResiduals option to TableScan (#1094)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c299b45  Add ignoreResiduals option to TableScan (#1094)
c299b45 is described below

commit c299b45bb6ec9a0f2f18451a16136bde9110b5b0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Jun 5 09:17:36 2020 -0700

    Add ignoreResiduals option to TableScan (#1094)
---
 .../main/java/org/apache/iceberg/TableScan.java    |   7 +
 .../java/org/apache/iceberg/AllDataFilesTable.java |  19 ++-
 .../java/org/apache/iceberg/AllEntriesTable.java   |  20 ++-
 .../java/org/apache/iceberg/AllManifestsTable.java |  19 ++-
 .../apache/iceberg/BaseAllMetadataTableScan.java   |   8 +-
 .../java/org/apache/iceberg/BaseTableScan.java     |  52 ++++--
 .../java/org/apache/iceberg/DataFilesTable.java    |  20 ++-
 .../java/org/apache/iceberg/DataTableScan.java     |  20 ++-
 .../apache/iceberg/IncrementalDataTableScan.java   |  17 +-
 .../org/apache/iceberg/ManifestEntriesTable.java   |  18 ++-
 .../java/org/apache/iceberg/ManifestGroup.java     |  10 +-
 .../java/org/apache/iceberg/StaticTableScan.java   |  16 +-
 .../java/org/apache/iceberg/TableTestBase.java     |   8 +-
 .../java/org/apache/iceberg/TestDataTableScan.java |  37 +++++
 .../iceberg/TestIncrementalDataTableScan.java      |  39 ++++-
 .../org/apache/iceberg/TestMetadataTableScans.java | 175 +++++++++++++++++++++
 16 files changed, 412 insertions(+), 73 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3377bc1..4537c7d 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -132,6 +132,13 @@ public interface TableScan {
   Expression filter();
 
   /**
+   * Create a new {@link TableScan} from this that applies data filtering to files but not to rows in those files.
+   *
+   * @return a new scan based on this that does not filter rows in files.
+   */
+  TableScan ignoreResiduals();
+
+  /**
    * Create a new {@link TableScan} to read appended data from {@code fromSnapshotId} exclusive to {@code toSnapshotId}
    * inclusive.
    *
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index c6f4927..cc3033e 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -85,19 +86,23 @@ public class AllDataFilesTable extends BaseMetadataTable {
 
     private AllDataFilesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+        Collection<String> selectedColumns, Schema fileSchema,
         ImmutableMap<String, String> options) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+      super(
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
       this.fileSchema = fileSchema;
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
       return new AllDataFilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, fileSchema, options);
     }
 
     @Override
@@ -118,11 +123,13 @@ public class AllDataFilesTable extends BaseMetadataTable {
 
     @Override
     protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = allDataManifestFiles(ops.current().snapshots());
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       // This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 3d84373..4734c85 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -82,18 +83,21 @@ public class AllEntriesTable extends BaseMetadataTable {
 
     private Scan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+        Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+      super(
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
       return new Scan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
@@ -104,12 +108,14 @@ public class AllEntriesTable extends BaseMetadataTable {
 
     @Override
     protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
       Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
           ops.io(), manifest, fileSchema, schemaString, specString, residuals));
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index e337ebd..2de065a 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
@@ -91,18 +92,21 @@ public class AllManifestsTable extends BaseMetadataTable {
 
     private AllManifestsTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+      super(
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
       return new AllManifestsTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
@@ -123,16 +127,19 @@ public class AllManifestsTable extends BaseMetadataTable {
 
     @Override
     protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
 
       // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
         if (snap.manifestListLocation() != null) {
+          Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+          ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
           return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
               DataFiles.fromManifestList(ops.io().newInputFile(snap.manifestListLocation())),
-              schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+              schemaString, specString, residuals));
         } else {
           return StaticDataTask.of(
               ops.io().newInputFile(ops.current().file().location()),
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index 3f7139d..2560c05 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -37,9 +37,11 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
 
   BaseAllMetadataTableScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
       ImmutableMap<String, String> options) {
-    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+    super(
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
@@ -47,6 +49,6 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
     LOG.info("Scanning metadata table {} with filter {}.", table(), filter());
     Listeners.notifyAll(new ScanEvent(table().toString(), 0L, filter(), schema()));
 
-    return planFiles(tableOps(), snapshot(), filter(), isCaseSensitive(), colStats());
+    return planFiles(tableOps(), snapshot(), filter(), shouldIgnoreResiduals(), isCaseSensitive(), colStats());
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 9e55e0e..27730e7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -57,23 +57,25 @@ abstract class BaseTableScan implements TableScan {
   private final Long snapshotId;
   private final Schema schema;
   private final Expression rowFilter;
+  private final boolean ignoreResiduals;
   private final boolean caseSensitive;
   private final boolean colStats;
   private final Collection<String> selectedColumns;
   private final ImmutableMap<String, String> options;
 
   protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
-    this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null, ImmutableMap.of());
+    this(ops, table, null, schema, Expressions.alwaysTrue(), false, true, false, null, ImmutableMap.of());
   }
 
   protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
-                        Expression rowFilter, boolean caseSensitive, boolean colStats,
-                        Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+                          Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+                          Collection<String> selectedColumns, ImmutableMap<String, String> options) {
     this.ops = ops;
     this.table = table;
     this.snapshotId = snapshotId;
     this.schema = schema;
     this.rowFilter = rowFilter;
+    this.ignoreResiduals = ignoreResiduals;
     this.caseSensitive = caseSensitive;
     this.colStats = colStats;
     this.selectedColumns = selectedColumns;
@@ -92,6 +94,10 @@ abstract class BaseTableScan implements TableScan {
     return colStats;
   }
 
+  protected boolean shouldIgnoreResiduals() {
+    return ignoreResiduals;
+  }
+
   protected Collection<String> selectedColumns() {
     return selectedColumns;
   }
@@ -106,12 +112,13 @@ abstract class BaseTableScan implements TableScan {
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
       ImmutableMap<String, String> options);
 
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract CloseableIterable<FileScanTask> planFiles(
-      TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats);
+      TableOperations ops, Snapshot snapshot, Expression rowFilter,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats);
 
   @Override
   public Table table() {
@@ -135,7 +142,8 @@ abstract class BaseTableScan implements TableScan {
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
     return newRefinedScan(
-        ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+        ops, table, scanSnapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
@@ -165,35 +173,43 @@ abstract class BaseTableScan implements TableScan {
     builder.put(property, value);
 
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, builder.build());
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, builder.build());
   }
 
   @Override
   public TableScan project(Schema projectedSchema) {
     return newRefinedScan(
-        ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+        ops, table, snapshotId, projectedSchema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   public TableScan caseSensitive(boolean scanCaseSensitive) {
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns, options);
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        scanCaseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   public TableScan includeColumnStats() {
-    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns, options);
+    return newRefinedScan(
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, true, selectedColumns, options);
   }
 
   @Override
   public TableScan select(Collection<String> columns) {
-    return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns, options);
+    return newRefinedScan(
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, columns, options);
   }
 
   @Override
   public TableScan filter(Expression expr) {
-    return newRefinedScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats,
-        selectedColumns, options);
+    return newRefinedScan(
+        ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
+        ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
@@ -202,6 +218,13 @@ abstract class BaseTableScan implements TableScan {
   }
 
   @Override
+  public TableScan ignoreResiduals() {
+    return newRefinedScan(
+        ops, table, snapshotId, schema, rowFilter, true,
+        caseSensitive, colStats, selectedColumns, options);
+  }
+
+  @Override
   public CloseableIterable<FileScanTask> planFiles() {
     Snapshot snapshot = snapshot();
     if (snapshot != null) {
@@ -212,7 +235,7 @@ abstract class BaseTableScan implements TableScan {
       Listeners.notifyAll(
           new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
 
-      return planFiles(ops, snapshot, rowFilter, caseSensitive, colStats);
+      return planFiles(ops, snapshot, rowFilter, ignoreResiduals, caseSensitive, colStats);
 
     } else {
       LOG.info("Scanning empty table {}", table);
@@ -276,6 +299,7 @@ abstract class BaseTableScan implements TableScan {
         .add("table", table)
         .add("projection", schema().asStruct())
         .add("filter", rowFilter)
+        .add("ignoreResiduals", ignoreResiduals)
         .add("caseSensitive", caseSensitive)
         .toString();
   }
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index dcfc70f..db0e085 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
@@ -77,19 +78,22 @@ public class DataFilesTable extends BaseMetadataTable {
 
     private FilesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
-        ImmutableMap<String, String> options) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+        Collection<String> selectedColumns, Schema fileSchema, ImmutableMap<String, String> options) {
+      super(
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
       this.fileSchema = fileSchema;
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
       return new FilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, fileSchema, options);
     }
 
     @Override
@@ -100,11 +104,13 @@ public class DataFilesTable extends BaseMetadataTable {
 
     @Override
     protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       // This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index b098bdd..fc06342 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -44,9 +44,11 @@ public class DataTableScan extends BaseTableScan {
   }
 
   protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
-                          Expression rowFilter, boolean caseSensitive, boolean colStats,
+                          Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
                           Collection<String> selectedColumns, ImmutableMap<String, String> options) {
-    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+    super(
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
@@ -55,7 +57,8 @@ public class DataTableScan extends BaseTableScan {
     Preconditions.checkState(scanSnapshotId == null,
         "Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId);
     return new IncrementalDataTableScan(
-        tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+        tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
+        isCaseSensitive(), colStats(), selectedColumns(), options(),
         fromSnapshotId, toSnapshotId);
   }
 
@@ -70,15 +73,16 @@ public class DataTableScan extends BaseTableScan {
   @Override
   protected TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
       ImmutableMap<String, String> options) {
     return new DataTableScan(
-        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
   }
 
   @Override
   public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
-                                                   Expression rowFilter, boolean caseSensitive, boolean colStats) {
+                                                   Expression rowFilter, boolean ignoreResiduals,
+                                                   boolean caseSensitive, boolean colStats) {
     ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests())
         .caseSensitive(caseSensitive)
         .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
@@ -86,6 +90,10 @@ public class DataTableScan extends BaseTableScan {
         .specsById(ops.current().specsById())
         .ignoreDeleted();
 
+    if (ignoreResiduals) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
     if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
       manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
     }
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 0d4065b..6d70362 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -37,11 +37,11 @@ class IncrementalDataTableScan extends DataTableScan {
   private long fromSnapshotId;
   private long toSnapshotId;
 
-  IncrementalDataTableScan(TableOperations ops, Table table, Schema schema,
-                           Expression rowFilter, boolean caseSensitive, boolean colStats,
+  IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, Expression rowFilter,
+                           boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
                            Collection<String> selectedColumns, ImmutableMap<String, String> options,
                            long fromSnapshotId, long toSnapshotId) {
-    super(ops, table, null, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+    super(ops, table, null, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
     validateSnapshotIds(table, fromSnapshotId, toSnapshotId);
     this.fromSnapshotId = fromSnapshotId;
     this.toSnapshotId = toSnapshotId;
@@ -65,7 +65,8 @@ class IncrementalDataTableScan extends DataTableScan {
   public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
     validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId);
     return new IncrementalDataTableScan(
-        tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+        tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
+        isCaseSensitive(), colStats(), selectedColumns(), options(),
         newFromSnapshotId, newToSnapshotId);
   }
 
@@ -99,6 +100,10 @@ class IncrementalDataTableScan extends DataTableScan {
         .specsById(tableOps().current().specsById())
         .ignoreDeleted();
 
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
     if (PLAN_SCANS_WITH_WORKER_POOL && manifests.size() > 1) {
       manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
     }
@@ -110,10 +115,10 @@ class IncrementalDataTableScan extends DataTableScan {
   @SuppressWarnings("checkstyle:HiddenField")
   protected TableScan newRefinedScan(
           TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-          boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+          boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
           ImmutableMap<String, String> options) {
     return new IncrementalDataTableScan(
-            ops, table, schema, rowFilter, caseSensitive, colStats, selectedColumns, options,
+            ops, table, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options,
             fromSnapshotId, toSnapshotId);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index f2536ec..520f113 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
@@ -78,18 +79,21 @@ public class ManifestEntriesTable extends BaseMetadataTable {
 
     private EntriesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
-      super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+      super(
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
     protected TableScan newRefinedScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
         ImmutableMap<String, String> options) {
       return new EntriesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+          caseSensitive, colStats, selectedColumns, options);
     }
 
     @Override
@@ -100,13 +104,15 @@ public class ManifestEntriesTable extends BaseMetadataTable {
 
     @Override
     protected CloseableIterable<FileScanTask> planFiles(
-        TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+        TableOperations ops, Snapshot snapshot, Expression rowFilter,
+        boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       // return entries from both data and delete manifests
       CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
       Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
-      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+      ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       return CloseableIterable.transform(manifests, manifest ->
           new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 3aca5f9..6308236 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -54,6 +54,7 @@ class ManifestGroup {
   private Expression partitionFilter;
   private boolean ignoreDeleted;
   private boolean ignoreExisting;
+  private boolean ignoreResiduals;
   private List<String> columns;
   private boolean caseSensitive;
   private ExecutorService executorService;
@@ -66,6 +67,7 @@ class ManifestGroup {
     this.partitionFilter = Expressions.alwaysTrue();
     this.ignoreDeleted = false;
     this.ignoreExisting = false;
+    this.ignoreResiduals = false;
     this.columns = BaseManifestReader.ALL_COLUMNS;
     this.caseSensitive = true;
     this.manifestPredicate = m -> true;
@@ -112,6 +114,11 @@ class ManifestGroup {
     return this;
   }
 
+  ManifestGroup ignoreResiduals() {
+    this.ignoreResiduals = true;
+    return this;
+  }
+
   ManifestGroup select(List<String> newColumns) {
     this.columns = Lists.newArrayList(newColumns);
     return this;
@@ -136,7 +143,8 @@ class ManifestGroup {
   public CloseableIterable<FileScanTask> planFiles() {
     LoadingCache<Integer, ResidualEvaluator> residualCache = Caffeine.newBuilder().build(specId -> {
       PartitionSpec spec = specsById.get(specId);
-      return ResidualEvaluator.of(spec, dataFilter, caseSensitive);
+      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
+      return ResidualEvaluator.of(spec, filter, caseSensitive);
     });
     boolean dropStats = BaseManifestReader.dropStats(dataFilter, columns);
     Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index 88c751c..14401ec 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -35,9 +35,11 @@ class StaticTableScan extends BaseTableScan {
 
   private StaticTableScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
       Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
-    super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+    super(
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options);
     this.buildTask = buildTask;
   }
 
@@ -50,15 +52,17 @@ class StaticTableScan extends BaseTableScan {
   @Override
   protected TableScan newRefinedScan(
       TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      ImmutableMap<String, String> options) {
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+      Collection<String> selectedColumns, ImmutableMap<String, String> options) {
     return new StaticTableScan(
-        ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask, options);
+        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, buildTask, options);
   }
 
   @Override
   protected CloseableIterable<FileScanTask> planFiles(
-      TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+      TableOperations ops, Snapshot snapshot, Expression rowFilter,
+      boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
     return CloseableIterable.withNoopClose(buildTask.apply(this));
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index e115675..8ef9732 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -56,25 +56,25 @@ public class TableTestBase {
 
   static final DataFile FILE_A = DataFiles.builder(SPEC)
       .withPath("/path/to/data-a.parquet")
-      .withFileSizeInBytes(0)
+      .withFileSizeInBytes(10)
       .withPartitionPath("data_bucket=0") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
   static final DataFile FILE_B = DataFiles.builder(SPEC)
       .withPath("/path/to/data-b.parquet")
-      .withFileSizeInBytes(0)
+      .withFileSizeInBytes(10)
       .withPartitionPath("data_bucket=1") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
   static final DataFile FILE_C = DataFiles.builder(SPEC)
       .withPath("/path/to/data-c.parquet")
-      .withFileSizeInBytes(0)
+      .withFileSizeInBytes(10)
       .withPartitionPath("data_bucket=2") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
   static final DataFile FILE_D = DataFiles.builder(SPEC)
       .withPath("/path/to/data-d.parquet")
-      .withFileSizeInBytes(0)
+      .withFileSizeInBytes(10)
       .withPartitionPath("data_bucket=3") // easy way to set partition data for now
       .withRecordCount(1)
       .build();
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index ff75d52..a685cf6 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -19,7 +19,12 @@
 
 package org.apache.iceberg;
 
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Types;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -69,4 +74,36 @@ public class TestDataTableScan extends TableTestBase {
         scan2.schema().asStruct());
   }
 
+  @Test
+  public void testTableScanHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    TableScan scan1 = table.newScan()
+        .filter(Expressions.equal("id", 5));
+
+    try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+        }
+      }
+    }
+
+    TableScan scan2 = table.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+
+    try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+        }
+      }
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
index bcbb894..a49978b 100644
--- a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
@@ -19,8 +19,11 @@
 
 package org.apache.iceberg;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -174,10 +177,44 @@ public class TestIncrementalDataTableScan extends TableTestBase {
     filesMatch(Lists.newArrayList("B", "D", "E"), appendsAfterScan(1));
   }
 
+  @Test
+  public void testIgnoreResiduals() throws IOException {
+    add(table.newAppend(), files("A"));
+    add(table.newAppend(), files("B"));
+    add(table.newAppend(), files("C"));
+
+    TableScan scan1 = table.newScan()
+        .filter(Expressions.equal("id", 5))
+        .appendsBetween(1, 3);
+
+    try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+        }
+      }
+    }
+
+    TableScan scan2 = table.newScan()
+        .filter(Expressions.equal("id", 5))
+        .appendsBetween(1, 3)
+        .ignoreResiduals();
+
+    try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+        }
+      }
+    }
+  }
+
   private static DataFile file(String name) {
     return DataFiles.builder(SPEC)
             .withPath(name + ".parquet")
-            .withFileSizeInBytes(0)
+            .withFileSizeInBytes(10)
             .withPartitionPath("data_bucket=0") // easy way to set partition data for now
             .withRecordCount(1)
             .build();
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
new file mode 100644
index 0000000..59fca14
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestMetadataTableScans extends TableTestBase {
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { 1 },
+        new Object[] { 2 },
+    };
+  }
+
+  public TestMetadataTableScans(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Test
+  public void testManifestsTableAlwaysIgnoresResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table manifestsTable = new ManifestsTable(table.ops(), table);
+
+    TableScan scan = manifestsTable.newScan()
+        .filter(Expressions.equal("id", 5));
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+      for (FileScanTask task : tasks) {
+        Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
+      }
+    }
+  }
+
+  @Test
+  public void testDataFilesTableHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    TableScan scan1 = dataFilesTable.newScan()
+        .filter(Expressions.equal("id", 5));
+    validateTaskScanResiduals(scan1, false);
+
+    TableScan scan2 = dataFilesTable.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+    validateTaskScanResiduals(scan2, true);
+  }
+
+  @Test
+  public void testManifestEntriesTableHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table);
+
+    TableScan scan1 = manifestEntriesTable.newScan()
+        .filter(Expressions.equal("id", 5));
+    validateTaskScanResiduals(scan1, false);
+
+    TableScan scan2 = manifestEntriesTable.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+    validateTaskScanResiduals(scan2, true);
+  }
+
+  @Test
+  public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table allDataFilesTable = new AllDataFilesTable(table.ops(), table);
+
+    TableScan scan1 = allDataFilesTable.newScan()
+        .filter(Expressions.equal("id", 5));
+    validateTaskScanResiduals(scan1, false);
+
+    TableScan scan2 = allDataFilesTable.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+    validateTaskScanResiduals(scan2, true);
+  }
+
+  @Test
+  public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table allEntriesTable = new AllEntriesTable(table.ops(), table);
+
+    TableScan scan1 = allEntriesTable.newScan()
+        .filter(Expressions.equal("id", 5));
+    validateTaskScanResiduals(scan1, false);
+
+    TableScan scan2 = allEntriesTable.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+    validateTaskScanResiduals(scan2, true);
+  }
+
+  @Test
+  public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Table allManifestsTable = new AllManifestsTable(table.ops(), table);
+
+    TableScan scan1 = allManifestsTable.newScan()
+        .filter(Expressions.equal("id", 5));
+    validateTaskScanResiduals(scan1, false);
+
+    TableScan scan2 = allManifestsTable.newScan()
+        .filter(Expressions.equal("id", 5))
+        .ignoreResiduals();
+    validateTaskScanResiduals(scan2, true);
+  }
+
+  private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
+    try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          if (ignoreResiduals) {
+            Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+          } else {
+            Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+          }
+        }
+      }
+    }
+  }
+}