You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/17 00:12:14 UTC
[iceberg] branch master updated: Refactor TableScan optional
arguments into an immutable context object (#1115)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e2ef87c Refactor TableScan optional arguments into an immutable context object (#1115)
e2ef87c is described below
commit e2ef87c4c66c5ed6be13a335d4e03558bac5ec1a
Author: Edgar Rodriguez <26...@users.noreply.github.com>
AuthorDate: Tue Jun 16 17:12:05 2020 -0700
Refactor TableScan optional arguments into an immutable context object (#1115)
---
.../java/org/apache/iceberg/AllDataFilesTable.java | 22 +--
.../java/org/apache/iceberg/AllEntriesTable.java | 21 +--
.../java/org/apache/iceberg/AllManifestsTable.java | 22 +--
.../apache/iceberg/BaseAllMetadataTableScan.java | 12 +-
.../java/org/apache/iceberg/BaseTableScan.java | 110 ++++++---------
.../java/org/apache/iceberg/DataFilesTable.java | 21 +--
.../java/org/apache/iceberg/DataTableScan.java | 24 +---
.../apache/iceberg/IncrementalDataTableScan.java | 48 +++----
.../org/apache/iceberg/ManifestEntriesTable.java | 21 +--
.../java/org/apache/iceberg/StaticTableScan.java | 20 +--
.../java/org/apache/iceberg/TableScanContext.java | 151 +++++++++++++++++++++
11 files changed, 251 insertions(+), 221 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index cc3033e..8bd08bf 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -20,14 +20,12 @@
package org.apache.iceberg;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -84,25 +82,15 @@ public class AllDataFilesTable extends BaseMetadataTable {
this.fileSchema = fileSchema;
}
- private AllDataFilesTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, Schema fileSchema,
- ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private AllDataFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+ TableScanContext context) {
+ super(ops, table, schema, context);
this.fileSchema = fileSchema;
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new AllDataFilesTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, fileSchema, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new AllDataFilesTableScan(ops, table, schema, fileSchema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 4734c85..bb19c41 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -20,14 +20,12 @@
package org.apache.iceberg;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -81,23 +79,14 @@ public class AllEntriesTable extends BaseMetadataTable {
super(ops, table, schema);
}
- private Scan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private Scan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ super(ops, table, schema, context);
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new Scan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+ TableScanContext context) {
+ return new Scan(ops, table, schema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 20f442e..9be9999 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -20,7 +20,6 @@
package org.apache.iceberg;
import java.io.IOException;
-import java.util.Collection;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
@@ -29,7 +28,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
@@ -90,23 +88,15 @@ public class AllManifestsTable extends BaseMetadataTable {
super(ops, table, fileSchema);
}
- private AllManifestsTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private AllManifestsTableScan(TableOperations ops, Table table, Schema schema,
+ TableScanContext context) {
+ super(ops, table, schema, context);
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new AllManifestsTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+ TableScanContext context) {
+ return new AllManifestsTableScan(ops, table, schema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index 2560c05..e93cd5e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -19,12 +19,9 @@
package org.apache.iceberg;
-import java.util.Collection;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
-import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,13 +32,8 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
super(ops, table, fileSchema);
}
- BaseAllMetadataTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ BaseAllMetadataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ super(ops, table, schema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index f36d7ab..da1ff63 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -25,6 +25,7 @@ import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
@@ -34,7 +35,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.TableScanUtil;
@@ -52,32 +52,18 @@ abstract class BaseTableScan implements TableScan {
private final TableOperations ops;
private final Table table;
- private final Long snapshotId;
private final Schema schema;
- private final Expression rowFilter;
- private final boolean ignoreResiduals;
- private final boolean caseSensitive;
- private final boolean colStats;
- private final Collection<String> selectedColumns;
- private final ImmutableMap<String, String> options;
+ private final TableScanContext context;
protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
- this(ops, table, null, schema, Expressions.alwaysTrue(), false, true, false, null, ImmutableMap.of());
+ this(ops, table, schema, new TableScanContext());
}
- protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
- Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+ protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
this.ops = ops;
this.table = table;
- this.snapshotId = snapshotId;
this.schema = schema;
- this.rowFilter = rowFilter;
- this.ignoreResiduals = ignoreResiduals;
- this.caseSensitive = caseSensitive;
- this.colStats = colStats;
- this.selectedColumns = selectedColumns;
- this.options = options != null ? options : ImmutableMap.of();
+ this.context = context;
}
protected TableOperations tableOps() {
@@ -85,23 +71,27 @@ abstract class BaseTableScan implements TableScan {
}
protected Long snapshotId() {
- return snapshotId;
+ return context.snapshotId();
}
protected boolean colStats() {
- return colStats;
+ return context.returnColumnStats();
}
protected boolean shouldIgnoreResiduals() {
- return ignoreResiduals;
+ return context.ignoreResiduals();
}
protected Collection<String> selectedColumns() {
- return selectedColumns;
+ return context.selectedColumns();
}
- protected ImmutableMap<String, String> options() {
- return options;
+ protected Map<String, String> options() {
+ return context.options();
+ }
+
+ protected TableScanContext context() {
+ return context;
}
@SuppressWarnings("checkstyle:HiddenField")
@@ -109,9 +99,7 @@ abstract class BaseTableScan implements TableScan {
@SuppressWarnings("checkstyle:HiddenField")
protected abstract TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options);
+ TableOperations ops, Table table, Schema schema, TableScanContext context);
@SuppressWarnings("checkstyle:HiddenField")
protected abstract CloseableIterable<FileScanTask> planFiles(
@@ -135,19 +123,18 @@ abstract class BaseTableScan implements TableScan {
@Override
public TableScan useSnapshot(long scanSnapshotId) {
- Preconditions.checkArgument(this.snapshotId == null,
- "Cannot override snapshot, already set to id=%s", snapshotId);
+ Preconditions.checkArgument(context.snapshotId() == null,
+ "Cannot override snapshot, already set to id=%s", context.snapshotId());
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
return newRefinedScan(
- ops, table, scanSnapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ ops, table, schema, context.useSnapshotId(scanSnapshotId));
}
@Override
public TableScan asOfTime(long timestampMillis) {
- Preconditions.checkArgument(this.snapshotId == null,
- "Cannot override snapshot, already set to id=%s", snapshotId);
+ Preconditions.checkArgument(context.snapshotId() == null,
+ "Cannot override snapshot, already set to id=%s", context.snapshotId());
Long lastSnapshotId = null;
for (HistoryEntry logEntry : ops.current().snapshotLog()) {
@@ -166,60 +153,49 @@ abstract class BaseTableScan implements TableScan {
@Override
public TableScan option(String property, String value) {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- builder.putAll(options);
- builder.put(property, value);
-
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, builder.build());
+ ops, table, schema, context.withOption(property, value));
}
@Override
public TableScan project(Schema projectedSchema) {
return newRefinedScan(
- ops, table, snapshotId, projectedSchema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ ops, table, projectedSchema, context);
}
@Override
public TableScan caseSensitive(boolean scanCaseSensitive) {
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- scanCaseSensitive, colStats, selectedColumns, options);
+ ops, table, schema, context.setCaseSensitive(scanCaseSensitive));
}
@Override
public TableScan includeColumnStats() {
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, true, selectedColumns, options);
+ ops, table, schema, context.shouldReturnColumnStats(true));
}
@Override
public TableScan select(Collection<String> columns) {
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, columns, options);
+ ops, table, schema, context.selectColumns(columns));
}
@Override
public TableScan filter(Expression expr) {
- return newRefinedScan(
- ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
- ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
+ return newRefinedScan(ops, table, schema,
+ context.filterRows(Expressions.and(context.rowFilter(), expr)));
}
@Override
public Expression filter() {
- return rowFilter;
+ return context.rowFilter();
}
@Override
public TableScan ignoreResiduals() {
return newRefinedScan(
- ops, table, snapshotId, schema, rowFilter, true,
- caseSensitive, colStats, selectedColumns, options);
+ ops, table, schema, context.ignoreResiduals(true));
}
@Override
@@ -228,12 +204,13 @@ abstract class BaseTableScan implements TableScan {
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
- rowFilter);
+ context.rowFilter());
Listeners.notifyAll(
- new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
+ new ScanEvent(table.toString(), snapshot.snapshotId(), context.rowFilter(), schema()));
- return planFiles(ops, snapshot, rowFilter, ignoreResiduals, caseSensitive, colStats);
+ return planFiles(ops, snapshot,
+ context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());
} else {
LOG.info("Scanning empty table {}", table);
@@ -243,6 +220,7 @@ abstract class BaseTableScan implements TableScan {
@Override
public CloseableIterable<CombinedScanTask> planTasks() {
+ Map<String, String> options = context.options();
long splitSize;
if (options.containsKey(TableProperties.SPLIT_SIZE)) {
splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE));
@@ -276,14 +254,14 @@ abstract class BaseTableScan implements TableScan {
@Override
public Snapshot snapshot() {
- return snapshotId != null ?
- ops.current().snapshot(snapshotId) :
+ return context.snapshotId() != null ?
+ ops.current().snapshot(context.snapshotId()) :
ops.current().currentSnapshot();
}
@Override
public boolean isCaseSensitive() {
- return caseSensitive;
+ return context.caseSensitive();
}
@Override
@@ -291,9 +269,9 @@ abstract class BaseTableScan implements TableScan {
return MoreObjects.toStringHelper(this)
.add("table", table)
.add("projection", schema().asStruct())
- .add("filter", rowFilter)
- .add("ignoreResiduals", ignoreResiduals)
- .add("caseSensitive", caseSensitive)
+ .add("filter", context.rowFilter())
+ .add("ignoreResiduals", context.ignoreResiduals())
+ .add("caseSensitive", context.caseSensitive())
.toString();
}
@@ -304,16 +282,18 @@ abstract class BaseTableScan implements TableScan {
* @return the Schema to project
*/
private Schema lazyColumnProjection() {
+ Collection<String> selectedColumns = context.selectedColumns();
if (selectedColumns != null) {
Set<Integer> requiredFieldIds = Sets.newHashSet();
// all of the filter columns are required
requiredFieldIds.addAll(
- Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter), caseSensitive));
+ Binder.boundReferences(table.schema().asStruct(),
+ Collections.singletonList(context.rowFilter()), context.caseSensitive()));
// all of the projection columns are required
Set<Integer> selectedIds;
- if (caseSensitive) {
+ if (context.caseSensitive()) {
selectedIds = TypeUtil.getProjectedIds(table.schema().select(selectedColumns));
} else {
selectedIds = TypeUtil.getProjectedIds(table.schema().caseInsensitiveSelect(selectedColumns));
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index db0e085..97d9fb9 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,14 +19,12 @@
package org.apache.iceberg;
-import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -76,24 +74,15 @@ public class DataFilesTable extends BaseMetadataTable {
this.fileSchema = fileSchema;
}
- private FilesTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, Schema fileSchema, ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+ TableScanContext context) {
+ super(ops, table, schema, context);
this.fileSchema = fileSchema;
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new FilesTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, fileSchema, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new FilesTableScan(ops, table, schema, fileSchema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index fc06342..694075f 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -19,12 +19,10 @@
package org.apache.iceberg;
-import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ThreadPools;
public class DataTableScan extends BaseTableScan {
@@ -43,12 +41,8 @@ public class DataTableScan extends BaseTableScan {
super(ops, table, table.schema());
}
- protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
- Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ protected DataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ super(ops, table, schema, context);
}
@Override
@@ -56,10 +50,8 @@ public class DataTableScan extends BaseTableScan {
Long scanSnapshotId = snapshotId();
Preconditions.checkState(scanSnapshotId == null,
"Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId);
- return new IncrementalDataTableScan(
- tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
- isCaseSensitive(), colStats(), selectedColumns(), options(),
- fromSnapshotId, toSnapshotId);
+ return new IncrementalDataTableScan(tableOps(), table(), schema(),
+ context().fromSnapshotId(fromSnapshotId).toSnapshotId(toSnapshotId));
}
@Override
@@ -71,12 +63,8 @@ public class DataTableScan extends BaseTableScan {
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new DataTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new DataTableScan(ops, table, schema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 6d70362..89f33e8 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -19,14 +19,11 @@
package org.apache.iceberg;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
-import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -34,40 +31,31 @@ import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
class IncrementalDataTableScan extends DataTableScan {
- private long fromSnapshotId;
- private long toSnapshotId;
-
- IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options,
- long fromSnapshotId, long toSnapshotId) {
- super(ops, table, null, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
- validateSnapshotIds(table, fromSnapshotId, toSnapshotId);
- this.fromSnapshotId = fromSnapshotId;
- this.toSnapshotId = toSnapshotId;
+
+ IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ super(ops, table, schema, context.useSnapshotId(null));
+ validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId());
}
@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table as of time %s: configured for incremental data in snapshots (%s, %s]",
- timestampMillis, fromSnapshotId, toSnapshotId));
+ timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
}
@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(String.format(
"Cannot scan table using scan snapshot id %s: configured for incremental data in snapshots (%s, %s]",
- scanSnapshotId, fromSnapshotId, toSnapshotId));
+ scanSnapshotId, context().fromSnapshotId(), context().toSnapshotId()));
}
@Override
public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId);
- return new IncrementalDataTableScan(
- tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
- isCaseSensitive(), colStats(), selectedColumns(), options(),
- newFromSnapshotId, newToSnapshotId);
+ return new IncrementalDataTableScan(tableOps(), table(), schema(),
+ context().fromSnapshotId(newFromSnapshotId).toSnapshotId(newToSnapshotId));
}
@Override
@@ -81,7 +69,8 @@ class IncrementalDataTableScan extends DataTableScan {
@Override
public CloseableIterable<FileScanTask> planFiles() {
//TODO publish an incremental appends scan event
- List<Snapshot> snapshots = snapshotsWithin(table(), fromSnapshotId, toSnapshotId);
+ List<Snapshot> snapshots = snapshotsWithin(table(),
+ context().fromSnapshotId(), context().toSnapshotId());
Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
Set<ManifestFile> manifests = FluentIterable
.from(snapshots)
@@ -113,13 +102,8 @@ class IncrementalDataTableScan extends DataTableScan {
@Override
@SuppressWarnings("checkstyle:HiddenField")
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new IncrementalDataTableScan(
- ops, table, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options,
- fromSnapshotId, toSnapshotId);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new IncrementalDataTableScan(ops, table, schema, context);
}
private static List<Snapshot> snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) {
@@ -141,17 +125,17 @@ class IncrementalDataTableScan extends DataTableScan {
private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) {
Set<Long> snapshotIdsRange = Sets.newHashSet(
- SnapshotUtil.snapshotIdsBetween(table(), fromSnapshotId, toSnapshotId));
+ SnapshotUtil.snapshotIdsBetween(table(), context().fromSnapshotId(), context().toSnapshotId()));
// since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId]
- snapshotIdsRange.add(fromSnapshotId);
+ snapshotIdsRange.add(context().fromSnapshotId());
Preconditions.checkArgument(
snapshotIdsRange.contains(newFromSnapshotId),
"from snapshot id %s not in existing snapshot ids range (%s, %s]",
- newFromSnapshotId, fromSnapshotId, newToSnapshotId);
+ newFromSnapshotId, context().fromSnapshotId(), newToSnapshotId);
Preconditions.checkArgument(
snapshotIdsRange.contains(newToSnapshotId),
"to snapshot id %s not in existing snapshot ids range (%s, %s]",
- newToSnapshotId, fromSnapshotId, toSnapshotId);
+ newToSnapshotId, context().fromSnapshotId(), context().toSnapshotId());
}
private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 520f113..e4288c0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,14 +19,12 @@
package org.apache.iceberg;
-import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -77,23 +75,14 @@ public class ManifestEntriesTable extends BaseMetadataTable {
super(ops, table, schema);
}
- private EntriesTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private EntriesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ super(ops, table, schema, context);
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- ImmutableMap<String, String> options) {
- return new EntriesTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+ TableScanContext context) {
+ return new EntriesTableScan(ops, table, schema, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index 14401ec..ba97574 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -19,11 +19,9 @@
package org.apache.iceberg;
-import java.util.Collection;
import java.util.function.Function;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
class StaticTableScan extends BaseTableScan {
private final Function<StaticTableScan, DataTask> buildTask;
@@ -33,13 +31,9 @@ class StaticTableScan extends BaseTableScan {
this.buildTask = buildTask;
}
- private StaticTableScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
- Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
- super(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, options);
+ private StaticTableScan(TableOperations ops, Table table, Schema schema,
+ Function<StaticTableScan, DataTask> buildTask, TableScanContext context) {
+ super(ops, table, schema, context);
this.buildTask = buildTask;
}
@@ -50,13 +44,9 @@ class StaticTableScan extends BaseTableScan {
}
@Override
- protected TableScan newRefinedScan(
- TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
- Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new StaticTableScan(
- ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
- caseSensitive, colStats, selectedColumns, buildTask, options);
+ ops, table, schema, buildTask, context);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java
new file mode 100644
index 0000000..f4477fa
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * Context object with optional arguments for a TableScan.
+ */
+final class TableScanContext {
+ private final Long snapshotId;
+ private final Expression rowFilter;
+ private final boolean ignoreResiduals;
+ private final boolean caseSensitive;
+ private final boolean colStats;
+ private final Collection<String> selectedColumns;
+ private final ImmutableMap<String, String> options;
+ private final Long fromSnapshotId;
+ private final Long toSnapshotId;
+
+ TableScanContext() {
+ this.snapshotId = null;
+ this.rowFilter = Expressions.alwaysTrue();
+ this.ignoreResiduals = false;
+ this.caseSensitive = true;
+ this.colStats = false;
+ this.selectedColumns = null;
+ this.options = ImmutableMap.of();
+ this.fromSnapshotId = null;
+ this.toSnapshotId = null;
+ }
+
+ private TableScanContext(Long snapshotId, Expression rowFilter, boolean ignoreResiduals,
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+ ImmutableMap<String, String> options, Long fromSnapshotId, Long toSnapshotId) {
+ this.snapshotId = snapshotId;
+ this.rowFilter = rowFilter;
+ this.ignoreResiduals = ignoreResiduals;
+ this.caseSensitive = caseSensitive;
+ this.colStats = colStats;
+ this.selectedColumns = selectedColumns;
+ this.options = options;
+ this.fromSnapshotId = fromSnapshotId;
+ this.toSnapshotId = toSnapshotId;
+ }
+
+ Long snapshotId() {
+ return snapshotId;
+ }
+
+ TableScanContext useSnapshotId(Long scanSnapshotId) {
+ return new TableScanContext(scanSnapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ Expression rowFilter() {
+ return rowFilter;
+ }
+
+ TableScanContext filterRows(Expression filter) {
+ return new TableScanContext(snapshotId, filter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ boolean ignoreResiduals() {
+ return ignoreResiduals;
+ }
+
+ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
+ return new TableScanContext(snapshotId, rowFilter, shouldIgnoreResiduals,
+ caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ TableScanContext setCaseSensitive(boolean isCaseSensitive) {
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ isCaseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ boolean returnColumnStats() {
+ return colStats;
+ }
+
+ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, returnColumnStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ Collection<String> selectedColumns() {
+ return selectedColumns;
+ }
+
+ TableScanContext selectColumns(Collection<String> columns) {
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, columns, options, fromSnapshotId, toSnapshotId);
+ }
+
+ Map<String, String> options() {
+ return options;
+ }
+
+ TableScanContext withOption(String property, String value) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.putAll(options);
+ builder.put(property, value);
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, builder.build(), fromSnapshotId, toSnapshotId);
+ }
+
+ Long fromSnapshotId() {
+ return fromSnapshotId;
+ }
+
+ TableScanContext fromSnapshotId(long id) {
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options, id, toSnapshotId);
+ }
+
+ Long toSnapshotId() {
+ return toSnapshotId;
+ }
+
+ TableScanContext toSnapshotId(long id) {
+ return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+ caseSensitive, colStats, selectedColumns, options, fromSnapshotId, id);
+ }
+}