You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/05 16:17:46 UTC
[iceberg] branch master updated: Add ignoreResiduals option to
TableScan (#1094)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c299b45 Add ignoreResiduals option to TableScan (#1094)
c299b45 is described below
commit c299b45bb6ec9a0f2f18451a16136bde9110b5b0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Jun 5 09:17:36 2020 -0700
Add ignoreResiduals option to TableScan (#1094)
---
.../main/java/org/apache/iceberg/TableScan.java | 7 +
.../java/org/apache/iceberg/AllDataFilesTable.java | 19 ++-
.../java/org/apache/iceberg/AllEntriesTable.java | 20 ++-
.../java/org/apache/iceberg/AllManifestsTable.java | 19 ++-
.../apache/iceberg/BaseAllMetadataTableScan.java | 8 +-
.../java/org/apache/iceberg/BaseTableScan.java | 52 ++++--
.../java/org/apache/iceberg/DataFilesTable.java | 20 ++-
.../java/org/apache/iceberg/DataTableScan.java | 20 ++-
.../apache/iceberg/IncrementalDataTableScan.java | 17 +-
.../org/apache/iceberg/ManifestEntriesTable.java | 18 ++-
.../java/org/apache/iceberg/ManifestGroup.java | 10 +-
.../java/org/apache/iceberg/StaticTableScan.java | 16 +-
.../java/org/apache/iceberg/TableTestBase.java | 8 +-
.../java/org/apache/iceberg/TestDataTableScan.java | 37 +++++
.../iceberg/TestIncrementalDataTableScan.java | 39 ++++-
.../org/apache/iceberg/TestMetadataTableScans.java | 175 +++++++++++++++++++++
16 files changed, 412 insertions(+), 73 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 3377bc1..4537c7d 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -132,6 +132,13 @@ public interface TableScan {
Expression filter();
/**
+ * Create a new {@link TableScan} from this that applies data filtering to files but not to rows in those files.
+ *
+ * @return a new scan based on this that does not filter rows in files.
+ */
+ TableScan ignoreResiduals();
+
+ /**
* Create a new {@link TableScan} to read appended data from {@code fromSnapshotId} exclusive to {@code toSnapshotId}
* inclusive.
*
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index c6f4927..cc3033e 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -85,19 +86,23 @@ public class AllDataFilesTable extends BaseMetadataTable {
private AllDataFilesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns, Schema fileSchema,
ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
this.fileSchema = fileSchema;
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new AllDataFilesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, fileSchema, options);
}
@Override
@@ -118,11 +123,13 @@ public class AllDataFilesTable extends BaseMetadataTable {
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = allDataManifestFiles(ops.current().snapshots());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 3d84373..4734c85 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -82,18 +83,21 @@ public class AllEntriesTable extends BaseMetadataTable {
private Scan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new Scan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -104,12 +108,14 @@ public class AllEntriesTable extends BaseMetadataTable {
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
ops.io(), manifest, fileSchema, schemaString, specString, residuals));
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index e337ebd..2de065a 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
@@ -91,18 +92,21 @@ public class AllManifestsTable extends BaseMetadataTable {
private AllManifestsTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new AllManifestsTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -123,16 +127,19 @@ public class AllManifestsTable extends BaseMetadataTable {
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
if (snap.manifestListLocation() != null) {
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
DataFiles.fromManifestList(ops.io().newInputFile(snap.manifestListLocation())),
- schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+ schemaString, specString, residuals));
} else {
return StaticDataTask.of(
ops.io().newInputFile(ops.current().file().location()),
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index 3f7139d..2560c05 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -37,9 +37,11 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
BaseAllMetadataTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -47,6 +49,6 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
LOG.info("Scanning metadata table {} with filter {}.", table(), filter());
Listeners.notifyAll(new ScanEvent(table().toString(), 0L, filter(), schema()));
- return planFiles(tableOps(), snapshot(), filter(), isCaseSensitive(), colStats());
+ return planFiles(tableOps(), snapshot(), filter(), shouldIgnoreResiduals(), isCaseSensitive(), colStats());
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 9e55e0e..27730e7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -57,23 +57,25 @@ abstract class BaseTableScan implements TableScan {
private final Long snapshotId;
private final Schema schema;
private final Expression rowFilter;
+ private final boolean ignoreResiduals;
private final boolean caseSensitive;
private final boolean colStats;
private final Collection<String> selectedColumns;
private final ImmutableMap<String, String> options;
protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
- this(ops, table, null, schema, Expressions.alwaysTrue(), true, false, null, ImmutableMap.of());
+ this(ops, table, null, schema, Expressions.alwaysTrue(), false, true, false, null, ImmutableMap.of());
}
protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
- Expression rowFilter, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+ Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns, ImmutableMap<String, String> options) {
this.ops = ops;
this.table = table;
this.snapshotId = snapshotId;
this.schema = schema;
this.rowFilter = rowFilter;
+ this.ignoreResiduals = ignoreResiduals;
this.caseSensitive = caseSensitive;
this.colStats = colStats;
this.selectedColumns = selectedColumns;
@@ -92,6 +94,10 @@ abstract class BaseTableScan implements TableScan {
return colStats;
}
+ protected boolean shouldIgnoreResiduals() {
+ return ignoreResiduals;
+ }
+
protected Collection<String> selectedColumns() {
return selectedColumns;
}
@@ -106,12 +112,13 @@ abstract class BaseTableScan implements TableScan {
@SuppressWarnings("checkstyle:HiddenField")
protected abstract TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options);
@SuppressWarnings("checkstyle:HiddenField")
protected abstract CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats);
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats);
@Override
public Table table() {
@@ -135,7 +142,8 @@ abstract class BaseTableScan implements TableScan {
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
return newRefinedScan(
- ops, table, scanSnapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, scanSnapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -165,35 +173,43 @@ abstract class BaseTableScan implements TableScan {
builder.put(property, value);
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, builder.build());
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, builder.build());
}
@Override
public TableScan project(Schema projectedSchema) {
return newRefinedScan(
- ops, table, snapshotId, projectedSchema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, projectedSchema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
public TableScan caseSensitive(boolean scanCaseSensitive) {
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, scanCaseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ scanCaseSensitive, colStats, selectedColumns, options);
}
@Override
public TableScan includeColumnStats() {
- return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, true, selectedColumns, options);
+ return newRefinedScan(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, true, selectedColumns, options);
}
@Override
public TableScan select(Collection<String> columns) {
- return newRefinedScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, columns, options);
+ return newRefinedScan(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, columns, options);
}
@Override
public TableScan filter(Expression expr) {
- return newRefinedScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr), caseSensitive, colStats,
- selectedColumns, options);
+ return newRefinedScan(
+ ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
+ ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -202,6 +218,13 @@ abstract class BaseTableScan implements TableScan {
}
@Override
+ public TableScan ignoreResiduals() {
+ return newRefinedScan(
+ ops, table, snapshotId, schema, rowFilter, true,
+ caseSensitive, colStats, selectedColumns, options);
+ }
+
+ @Override
public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
@@ -212,7 +235,7 @@ abstract class BaseTableScan implements TableScan {
Listeners.notifyAll(
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
- return planFiles(ops, snapshot, rowFilter, caseSensitive, colStats);
+ return planFiles(ops, snapshot, rowFilter, ignoreResiduals, caseSensitive, colStats);
} else {
LOG.info("Scanning empty table {}", table);
@@ -276,6 +299,7 @@ abstract class BaseTableScan implements TableScan {
.add("table", table)
.add("projection", schema().asStruct())
.add("filter", rowFilter)
+ .add("ignoreResiduals", ignoreResiduals)
.add("caseSensitive", caseSensitive)
.toString();
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index dcfc70f..db0e085 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
@@ -77,19 +78,22 @@ public class DataFilesTable extends BaseMetadataTable {
private FilesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema,
- ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns, Schema fileSchema, ImmutableMap<String, String> options) {
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
this.fileSchema = fileSchema;
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new FilesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, fileSchema, options);
}
@Override
@@ -100,11 +104,13 @@ public class DataFilesTable extends BaseMetadataTable {
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.dataManifests());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index b098bdd..fc06342 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -44,9 +44,11 @@ public class DataTableScan extends BaseTableScan {
}
protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
- Expression rowFilter, boolean caseSensitive, boolean colStats,
+ Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
Collection<String> selectedColumns, ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -55,7 +57,8 @@ public class DataTableScan extends BaseTableScan {
Preconditions.checkState(scanSnapshotId == null,
"Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId);
return new IncrementalDataTableScan(
- tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+ tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
+ isCaseSensitive(), colStats(), selectedColumns(), options(),
fromSnapshotId, toSnapshotId);
}
@@ -70,15 +73,16 @@ public class DataTableScan extends BaseTableScan {
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new DataTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
}
@Override
public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
- Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ Expression rowFilter, boolean ignoreResiduals,
+ boolean caseSensitive, boolean colStats) {
ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests())
.caseSensitive(caseSensitive)
.select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
@@ -86,6 +90,10 @@ public class DataTableScan extends BaseTableScan {
.specsById(ops.current().specsById())
.ignoreDeleted();
+ if (ignoreResiduals) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 0d4065b..6d70362 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -37,11 +37,11 @@ class IncrementalDataTableScan extends DataTableScan {
private long fromSnapshotId;
private long toSnapshotId;
- IncrementalDataTableScan(TableOperations ops, Table table, Schema schema,
- Expression rowFilter, boolean caseSensitive, boolean colStats,
+ IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
Collection<String> selectedColumns, ImmutableMap<String, String> options,
long fromSnapshotId, long toSnapshotId) {
- super(ops, table, null, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(ops, table, null, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
validateSnapshotIds(table, fromSnapshotId, toSnapshotId);
this.fromSnapshotId = fromSnapshotId;
this.toSnapshotId = toSnapshotId;
@@ -65,7 +65,8 @@ class IncrementalDataTableScan extends DataTableScan {
public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId);
return new IncrementalDataTableScan(
- tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+ tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
+ isCaseSensitive(), colStats(), selectedColumns(), options(),
newFromSnapshotId, newToSnapshotId);
}
@@ -99,6 +100,10 @@ class IncrementalDataTableScan extends DataTableScan {
.specsById(tableOps().current().specsById())
.ignoreDeleted();
+ if (shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
if (PLAN_SCANS_WITH_WORKER_POOL && manifests.size() > 1) {
manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}
@@ -110,10 +115,10 @@ class IncrementalDataTableScan extends DataTableScan {
@SuppressWarnings("checkstyle:HiddenField")
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new IncrementalDataTableScan(
- ops, table, schema, rowFilter, caseSensitive, colStats, selectedColumns, options,
+ ops, table, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options,
fromSnapshotId, toSnapshotId);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index f2536ec..520f113 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
@@ -78,18 +79,21 @@ public class ManifestEntriesTable extends BaseMetadataTable {
private EntriesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
ImmutableMap<String, String> options) {
return new EntriesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
}
@Override
@@ -100,13 +104,15 @@ public class ManifestEntriesTable extends BaseMetadataTable {
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
- ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 3aca5f9..6308236 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -54,6 +54,7 @@ class ManifestGroup {
private Expression partitionFilter;
private boolean ignoreDeleted;
private boolean ignoreExisting;
+ private boolean ignoreResiduals;
private List<String> columns;
private boolean caseSensitive;
private ExecutorService executorService;
@@ -66,6 +67,7 @@ class ManifestGroup {
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
+ this.ignoreResiduals = false;
this.columns = BaseManifestReader.ALL_COLUMNS;
this.caseSensitive = true;
this.manifestPredicate = m -> true;
@@ -112,6 +114,11 @@ class ManifestGroup {
return this;
}
+ ManifestGroup ignoreResiduals() {
+ this.ignoreResiduals = true;
+ return this;
+ }
+
ManifestGroup select(List<String> newColumns) {
this.columns = Lists.newArrayList(newColumns);
return this;
@@ -136,7 +143,8 @@ class ManifestGroup {
public CloseableIterable<FileScanTask> planFiles() {
LoadingCache<Integer, ResidualEvaluator> residualCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
- return ResidualEvaluator.of(spec, dataFilter, caseSensitive);
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
+ return ResidualEvaluator.of(spec, filter, caseSensitive);
});
boolean dropStats = BaseManifestReader.dropStats(dataFilter, columns);
Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index 88c751c..14401ec 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -35,9 +35,11 @@ class StaticTableScan extends BaseTableScan {
private StaticTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
- super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ super(
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options);
this.buildTask = buildTask;
}
@@ -50,15 +52,17 @@ class StaticTableScan extends BaseTableScan {
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
+ Collection<String> selectedColumns, ImmutableMap<String, String> options) {
return new StaticTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, buildTask, options);
+ ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, buildTask, options);
}
@Override
protected CloseableIterable<FileScanTask> planFiles(
- TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
+ TableOperations ops, Snapshot snapshot, Expression rowFilter,
+ boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
return CloseableIterable.withNoopClose(buildTask.apply(this));
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index e115675..8ef9732 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -56,25 +56,25 @@ public class TableTestBase {
static final DataFile FILE_A = DataFiles.builder(SPEC)
.withPath("/path/to/data-a.parquet")
- .withFileSizeInBytes(0)
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_B = DataFiles.builder(SPEC)
.withPath("/path/to/data-b.parquet")
- .withFileSizeInBytes(0)
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_C = DataFiles.builder(SPEC)
.withPath("/path/to/data-c.parquet")
- .withFileSizeInBytes(0)
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=2") // easy way to set partition data for now
.withRecordCount(1)
.build();
static final DataFile FILE_D = DataFiles.builder(SPEC)
.withPath("/path/to/data-d.parquet")
- .withFileSizeInBytes(0)
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=3") // easy way to set partition data for now
.withRecordCount(1)
.build();
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index ff75d52..a685cf6 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -19,7 +19,12 @@
package org.apache.iceberg;
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -69,4 +74,36 @@ public class TestDataTableScan extends TableTestBase {
scan2.schema().asStruct());
}
+ @Test
+ public void testTableScanHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ TableScan scan1 = table.newScan()
+ .filter(Expressions.equal("id", 5));
+
+ try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+
+ TableScan scan2 = table.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+
+ try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
index bcbb894..a49978b 100644
--- a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
@@ -19,8 +19,11 @@
package org.apache.iceberg;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -174,10 +177,44 @@ public class TestIncrementalDataTableScan extends TableTestBase {
filesMatch(Lists.newArrayList("B", "D", "E"), appendsAfterScan(1));
}
+ @Test
+ public void testIgnoreResiduals() throws IOException {
+ add(table.newAppend(), files("A"));
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C"));
+
+ TableScan scan1 = table.newScan()
+ .filter(Expressions.equal("id", 5))
+ .appendsBetween(1, 3);
+
+ try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+
+ TableScan scan2 = table.newScan()
+ .filter(Expressions.equal("id", 5))
+ .appendsBetween(1, 3)
+ .ignoreResiduals();
+
+ try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", com.google.common.collect.Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+ }
+
private static DataFile file(String name) {
return DataFiles.builder(SPEC)
.withPath(name + ".parquet")
- .withFileSizeInBytes(0)
+ .withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
new file mode 100644
index 0000000..59fca14
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.IOException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestMetadataTableScans extends TableTestBase {
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] { 1 },
+ new Object[] { 2 },
+ };
+ }
+
+ public TestMetadataTableScans(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Test
+ public void testManifestsTableAlwaysIgnoresResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table manifestsTable = new ManifestsTable(table.ops(), table);
+
+ TableScan scan = manifestsTable.newScan()
+ .filter(Expressions.equal("id", 5));
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+ for (FileScanTask task : tasks) {
+ Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
+ }
+ }
+ }
+
+ @Test
+ public void testDataFilesTableHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ TableScan scan1 = dataFilesTable.newScan()
+ .filter(Expressions.equal("id", 5));
+ validateTaskScanResiduals(scan1, false);
+
+ TableScan scan2 = dataFilesTable.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+ validateTaskScanResiduals(scan2, true);
+ }
+
+ @Test
+ public void testManifestEntriesTableHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table);
+
+ TableScan scan1 = manifestEntriesTable.newScan()
+ .filter(Expressions.equal("id", 5));
+ validateTaskScanResiduals(scan1, false);
+
+ TableScan scan2 = manifestEntriesTable.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+ validateTaskScanResiduals(scan2, true);
+ }
+
+ @Test
+ public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table allDataFilesTable = new AllDataFilesTable(table.ops(), table);
+
+ TableScan scan1 = allDataFilesTable.newScan()
+ .filter(Expressions.equal("id", 5));
+ validateTaskScanResiduals(scan1, false);
+
+ TableScan scan2 = allDataFilesTable.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+ validateTaskScanResiduals(scan2, true);
+ }
+
+ @Test
+ public void testAllEntriesTableHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table allEntriesTable = new AllEntriesTable(table.ops(), table);
+
+ TableScan scan1 = allEntriesTable.newScan()
+ .filter(Expressions.equal("id", 5));
+ validateTaskScanResiduals(scan1, false);
+
+ TableScan scan2 = allEntriesTable.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+ validateTaskScanResiduals(scan2, true);
+ }
+
+ @Test
+ public void testAllManifestsTableHonorsIgnoreResiduals() throws IOException {
+ table.newFastAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Table allManifestsTable = new AllManifestsTable(table.ops(), table);
+
+ TableScan scan1 = allManifestsTable.newScan()
+ .filter(Expressions.equal("id", 5));
+ validateTaskScanResiduals(scan1, false);
+
+ TableScan scan2 = allManifestsTable.newScan()
+ .filter(Expressions.equal("id", 5))
+ .ignoreResiduals();
+ validateTaskScanResiduals(scan2, true);
+ }
+
+ private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
+ try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ if (ignoreResiduals) {
+ Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+ } else {
+ Assert.assertNotEquals("Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+ }
+ }
+}