You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/17 00:12:14 UTC

[iceberg] branch master updated: Refactor TableScan optional arguments into an immutable context object (#1115)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e2ef87c  Refactor TableScan optional arguments into an immutable context object (#1115)
e2ef87c is described below

commit e2ef87c4c66c5ed6be13a335d4e03558bac5ec1a
Author: Edgar Rodriguez <26...@users.noreply.github.com>
AuthorDate: Tue Jun 16 17:12:05 2020 -0700

    Refactor TableScan optional arguments into an immutable context object (#1115)
---
 .../java/org/apache/iceberg/AllDataFilesTable.java |  22 +--
 .../java/org/apache/iceberg/AllEntriesTable.java   |  21 +--
 .../java/org/apache/iceberg/AllManifestsTable.java |  22 +--
 .../apache/iceberg/BaseAllMetadataTableScan.java   |  12 +-
 .../java/org/apache/iceberg/BaseTableScan.java     | 110 ++++++---------
 .../java/org/apache/iceberg/DataFilesTable.java    |  21 +--
 .../java/org/apache/iceberg/DataTableScan.java     |  24 +---
 .../apache/iceberg/IncrementalDataTableScan.java   |  48 +++----
 .../org/apache/iceberg/ManifestEntriesTable.java   |  21 +--
 .../java/org/apache/iceberg/StaticTableScan.java   |  20 +--
 .../java/org/apache/iceberg/TableScanContext.java  | 151 +++++++++++++++++++++
 11 files changed, 251 insertions(+), 221 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index cc3033e..8bd08bf 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -20,14 +20,12 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -84,25 +82,15 @@ public class AllDataFilesTable extends BaseMetadataTable {
       this.fileSchema = fileSchema;
     }
 
-    private AllDataFilesTableScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-        Collection<String> selectedColumns, Schema fileSchema,
-        ImmutableMap<String, String> options) {
-      super(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    private AllDataFilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+                                  TableScanContext context) {
+      super(ops, table, schema, context);
       this.fileSchema = fileSchema;
     }
 
     @Override
-    protected TableScan newRefinedScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      return new AllDataFilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, fileSchema, options);
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      return new AllDataFilesTableScan(ops, table, schema, fileSchema, context);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 4734c85..bb19c41 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -20,14 +20,12 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -81,23 +79,14 @@ public class AllEntriesTable extends BaseMetadataTable {
       super(ops, table, schema);
     }
 
-    private Scan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-        Collection<String> selectedColumns, ImmutableMap<String, String> options) {
-      super(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    private Scan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      super(ops, table, schema, context);
     }
 
     @Override
-    protected TableScan newRefinedScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      return new Scan(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+                                       TableScanContext context) {
+      return new Scan(ops, table, schema, context);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 20f442e..9be9999 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg;
 
 import java.io.IOException;
-import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
@@ -29,7 +28,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Types;
 
@@ -90,23 +88,15 @@ public class AllManifestsTable extends BaseMetadataTable {
       super(ops, table, fileSchema);
     }
 
-    private AllManifestsTableScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      super(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    private AllManifestsTableScan(TableOperations ops, Table table, Schema schema,
+                                  TableScanContext context) {
+      super(ops, table, schema, context);
     }
 
     @Override
-    protected TableScan newRefinedScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      return new AllManifestsTableScan(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+                                       TableScanContext context) {
+      return new AllManifestsTableScan(ops, table, schema, context);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index 2560c05..e93cd5e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -19,12 +19,9 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
-import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,13 +32,8 @@ abstract class BaseAllMetadataTableScan extends BaseTableScan {
     super(ops, table, fileSchema);
   }
 
-  BaseAllMetadataTableScan(
-      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      ImmutableMap<String, String> options) {
-    super(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, options);
+  BaseAllMetadataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index f36d7ab..da1ff63 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -25,6 +25,7 @@ import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.events.ScanEvent;
@@ -34,7 +35,6 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.TableScanUtil;
@@ -52,32 +52,18 @@ abstract class BaseTableScan implements TableScan {
 
   private final TableOperations ops;
   private final Table table;
-  private final Long snapshotId;
   private final Schema schema;
-  private final Expression rowFilter;
-  private final boolean ignoreResiduals;
-  private final boolean caseSensitive;
-  private final boolean colStats;
-  private final Collection<String> selectedColumns;
-  private final ImmutableMap<String, String> options;
+  private final TableScanContext context;
 
   protected BaseTableScan(TableOperations ops, Table table, Schema schema) {
-    this(ops, table, null, schema, Expressions.alwaysTrue(), false, true, false, null, ImmutableMap.of());
+    this(ops, table, schema, new TableScanContext());
   }
 
-  protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
-                          Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-                          Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+  protected BaseTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
     this.ops = ops;
     this.table = table;
-    this.snapshotId = snapshotId;
     this.schema = schema;
-    this.rowFilter = rowFilter;
-    this.ignoreResiduals = ignoreResiduals;
-    this.caseSensitive = caseSensitive;
-    this.colStats = colStats;
-    this.selectedColumns = selectedColumns;
-    this.options = options != null ? options : ImmutableMap.of();
+    this.context = context;
   }
 
   protected TableOperations tableOps() {
@@ -85,23 +71,27 @@ abstract class BaseTableScan implements TableScan {
   }
 
   protected Long snapshotId() {
-    return snapshotId;
+    return context.snapshotId();
   }
 
   protected boolean colStats() {
-    return colStats;
+    return context.returnColumnStats();
   }
 
   protected boolean shouldIgnoreResiduals() {
-    return ignoreResiduals;
+    return context.ignoreResiduals();
   }
 
   protected Collection<String> selectedColumns() {
-    return selectedColumns;
+    return context.selectedColumns();
   }
 
-  protected ImmutableMap<String, String> options() {
-    return options;
+  protected Map<String, String> options() {
+    return context.options();
+  }
+
+  protected  TableScanContext context() {
+    return context;
   }
 
   @SuppressWarnings("checkstyle:HiddenField")
@@ -109,9 +99,7 @@ abstract class BaseTableScan implements TableScan {
 
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract TableScan newRefinedScan(
-      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      ImmutableMap<String, String> options);
+      TableOperations ops, Table table, Schema schema, TableScanContext context);
 
   @SuppressWarnings("checkstyle:HiddenField")
   protected abstract CloseableIterable<FileScanTask> planFiles(
@@ -135,19 +123,18 @@ abstract class BaseTableScan implements TableScan {
 
   @Override
   public TableScan useSnapshot(long scanSnapshotId) {
-    Preconditions.checkArgument(this.snapshotId == null,
-        "Cannot override snapshot, already set to id=%s", snapshotId);
+    Preconditions.checkArgument(context.snapshotId() == null,
+        "Cannot override snapshot, already set to id=%s", context.snapshotId());
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
     return newRefinedScan(
-        ops, table, scanSnapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, options);
+        ops, table, schema, context.useSnapshotId(scanSnapshotId));
   }
 
   @Override
   public TableScan asOfTime(long timestampMillis) {
-    Preconditions.checkArgument(this.snapshotId == null,
-        "Cannot override snapshot, already set to id=%s", snapshotId);
+    Preconditions.checkArgument(context.snapshotId() == null,
+        "Cannot override snapshot, already set to id=%s", context.snapshotId());
 
     Long lastSnapshotId = null;
     for (HistoryEntry logEntry : ops.current().snapshotLog()) {
@@ -166,60 +153,49 @@ abstract class BaseTableScan implements TableScan {
 
   @Override
   public TableScan option(String property, String value) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    builder.putAll(options);
-    builder.put(property, value);
-
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, builder.build());
+        ops, table, schema, context.withOption(property, value));
   }
 
   @Override
   public TableScan project(Schema projectedSchema) {
     return newRefinedScan(
-        ops, table, snapshotId, projectedSchema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, options);
+        ops, table, projectedSchema, context);
   }
 
   @Override
   public TableScan caseSensitive(boolean scanCaseSensitive) {
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        scanCaseSensitive, colStats, selectedColumns, options);
+        ops, table, schema, context.setCaseSensitive(scanCaseSensitive));
   }
 
   @Override
   public TableScan includeColumnStats() {
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, true, selectedColumns, options);
+        ops, table, schema, context.shouldReturnColumnStats(true));
   }
 
   @Override
   public TableScan select(Collection<String> columns) {
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, columns, options);
+        ops, table, schema, context.selectColumns(columns));
   }
 
   @Override
   public TableScan filter(Expression expr) {
-    return newRefinedScan(
-        ops, table, snapshotId, schema, Expressions.and(rowFilter, expr),
-        ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
+    return newRefinedScan(ops, table, schema,
+        context.filterRows(Expressions.and(context.rowFilter(), expr)));
   }
 
   @Override
   public Expression filter() {
-    return rowFilter;
+    return context.rowFilter();
   }
 
   @Override
   public TableScan ignoreResiduals() {
     return newRefinedScan(
-        ops, table, snapshotId, schema, rowFilter, true,
-        caseSensitive, colStats, selectedColumns, options);
+        ops, table, schema, context.ignoreResiduals(true));
   }
 
   @Override
@@ -228,12 +204,13 @@ abstract class BaseTableScan implements TableScan {
     if (snapshot != null) {
       LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
           snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
-          rowFilter);
+          context.rowFilter());
 
       Listeners.notifyAll(
-          new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
+          new ScanEvent(table.toString(), snapshot.snapshotId(), context.rowFilter(), schema()));
 
-      return planFiles(ops, snapshot, rowFilter, ignoreResiduals, caseSensitive, colStats);
+      return planFiles(ops, snapshot,
+          context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());
 
     } else {
       LOG.info("Scanning empty table {}", table);
@@ -243,6 +220,7 @@ abstract class BaseTableScan implements TableScan {
 
   @Override
   public CloseableIterable<CombinedScanTask> planTasks() {
+    Map<String, String> options = context.options();
     long splitSize;
     if (options.containsKey(TableProperties.SPLIT_SIZE)) {
       splitSize = Long.parseLong(options.get(TableProperties.SPLIT_SIZE));
@@ -276,14 +254,14 @@ abstract class BaseTableScan implements TableScan {
 
   @Override
   public Snapshot snapshot() {
-    return snapshotId != null ?
-        ops.current().snapshot(snapshotId) :
+    return context.snapshotId() != null ?
+        ops.current().snapshot(context.snapshotId()) :
         ops.current().currentSnapshot();
   }
 
   @Override
   public boolean isCaseSensitive() {
-    return caseSensitive;
+    return context.caseSensitive();
   }
 
   @Override
@@ -291,9 +269,9 @@ abstract class BaseTableScan implements TableScan {
     return MoreObjects.toStringHelper(this)
         .add("table", table)
         .add("projection", schema().asStruct())
-        .add("filter", rowFilter)
-        .add("ignoreResiduals", ignoreResiduals)
-        .add("caseSensitive", caseSensitive)
+        .add("filter", context.rowFilter())
+        .add("ignoreResiduals", context.ignoreResiduals())
+        .add("caseSensitive", context.caseSensitive())
         .toString();
   }
 
@@ -304,16 +282,18 @@ abstract class BaseTableScan implements TableScan {
    * @return the Schema to project
    */
   private Schema lazyColumnProjection() {
+    Collection<String> selectedColumns = context.selectedColumns();
     if (selectedColumns != null) {
       Set<Integer> requiredFieldIds = Sets.newHashSet();
 
       // all of the filter columns are required
       requiredFieldIds.addAll(
-          Binder.boundReferences(table.schema().asStruct(), Collections.singletonList(rowFilter), caseSensitive));
+          Binder.boundReferences(table.schema().asStruct(),
+              Collections.singletonList(context.rowFilter()), context.caseSensitive()));
 
       // all of the projection columns are required
       Set<Integer> selectedIds;
-      if (caseSensitive) {
+      if (context.caseSensitive()) {
         selectedIds = TypeUtil.getProjectedIds(table.schema().select(selectedColumns));
       } else {
         selectedIds = TypeUtil.getProjectedIds(table.schema().caseInsensitiveSelect(selectedColumns));
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index db0e085..97d9fb9 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,14 +19,12 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 
@@ -76,24 +74,15 @@ public class DataFilesTable extends BaseMetadataTable {
       this.fileSchema = fileSchema;
     }
 
-    private FilesTableScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-        Collection<String> selectedColumns, Schema fileSchema, ImmutableMap<String, String> options) {
-      super(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    private FilesTableScan(TableOperations ops, Table table, Schema schema, Schema fileSchema,
+                           TableScanContext context) {
+      super(ops, table, schema, context);
       this.fileSchema = fileSchema;
     }
 
     @Override
-    protected TableScan newRefinedScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      return new FilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, fileSchema, options);
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      return new FilesTableScan(ops, table, schema, fileSchema, context);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index fc06342..694075f 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -19,12 +19,10 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.util.ThreadPools;
 
 public class DataTableScan extends BaseTableScan {
@@ -43,12 +41,8 @@ public class DataTableScan extends BaseTableScan {
     super(ops, table, table.schema());
   }
 
-  protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema,
-                          Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-                          Collection<String> selectedColumns, ImmutableMap<String, String> options) {
-    super(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, options);
+  protected DataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
   }
 
   @Override
@@ -56,10 +50,8 @@ public class DataTableScan extends BaseTableScan {
     Long scanSnapshotId = snapshotId();
     Preconditions.checkState(scanSnapshotId == null,
         "Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId);
-    return new IncrementalDataTableScan(
-        tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
-        isCaseSensitive(), colStats(), selectedColumns(), options(),
-        fromSnapshotId, toSnapshotId);
+    return new IncrementalDataTableScan(tableOps(), table(), schema(),
+        context().fromSnapshotId(fromSnapshotId).toSnapshotId(toSnapshotId));
   }
 
   @Override
@@ -71,12 +63,8 @@ public class DataTableScan extends BaseTableScan {
   }
 
   @Override
-  protected TableScan newRefinedScan(
-      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      ImmutableMap<String, String> options) {
-    return new DataTableScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new DataTableScan(ops, table, schema, context);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 6d70362..89f33e8 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -19,14 +19,11 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -34,40 +31,31 @@ import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
 
 class IncrementalDataTableScan extends DataTableScan {
-  private long fromSnapshotId;
-  private long toSnapshotId;
-
-  IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, Expression rowFilter,
-                           boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-                           Collection<String> selectedColumns, ImmutableMap<String, String> options,
-                           long fromSnapshotId, long toSnapshotId) {
-    super(ops, table, null, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options);
-    validateSnapshotIds(table, fromSnapshotId, toSnapshotId);
-    this.fromSnapshotId = fromSnapshotId;
-    this.toSnapshotId = toSnapshotId;
+
+  IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context.useSnapshotId(null));
+    validateSnapshotIds(table, context.fromSnapshotId(), context.toSnapshotId());
   }
 
   @Override
   public TableScan asOfTime(long timestampMillis) {
     throw new UnsupportedOperationException(String.format(
         "Cannot scan table as of time %s: configured for incremental data in snapshots (%s, %s]",
-        timestampMillis, fromSnapshotId, toSnapshotId));
+        timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
   }
 
   @Override
   public TableScan useSnapshot(long scanSnapshotId) {
     throw new UnsupportedOperationException(String.format(
         "Cannot scan table using scan snapshot id %s: configured for incremental data in snapshots (%s, %s]",
-        scanSnapshotId, fromSnapshotId, toSnapshotId));
+        scanSnapshotId, context().fromSnapshotId(), context().toSnapshotId()));
   }
 
   @Override
   public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
     validateSnapshotIdsRefinement(newFromSnapshotId, newToSnapshotId);
-    return new IncrementalDataTableScan(
-        tableOps(), table(), schema(), filter(), shouldIgnoreResiduals(),
-        isCaseSensitive(), colStats(), selectedColumns(), options(),
-        newFromSnapshotId, newToSnapshotId);
+    return new IncrementalDataTableScan(tableOps(), table(), schema(),
+        context().fromSnapshotId(newFromSnapshotId).toSnapshotId(newToSnapshotId));
   }
 
   @Override
@@ -81,7 +69,8 @@ class IncrementalDataTableScan extends DataTableScan {
   @Override
   public CloseableIterable<FileScanTask> planFiles() {
     //TODO publish an incremental appends scan event
-    List<Snapshot> snapshots = snapshotsWithin(table(), fromSnapshotId, toSnapshotId);
+    List<Snapshot> snapshots = snapshotsWithin(table(),
+        context().fromSnapshotId(), context().toSnapshotId());
     Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
     Set<ManifestFile> manifests = FluentIterable
         .from(snapshots)
@@ -113,13 +102,8 @@ class IncrementalDataTableScan extends DataTableScan {
 
   @Override
   @SuppressWarnings("checkstyle:HiddenField")
-  protected TableScan newRefinedScan(
-          TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-          boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-          ImmutableMap<String, String> options) {
-    return new IncrementalDataTableScan(
-            ops, table, schema, rowFilter, ignoreResiduals, caseSensitive, colStats, selectedColumns, options,
-            fromSnapshotId, toSnapshotId);
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new IncrementalDataTableScan(ops, table, schema, context);
   }
 
   private static List<Snapshot> snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) {
@@ -141,17 +125,17 @@ class IncrementalDataTableScan extends DataTableScan {
 
   private void validateSnapshotIdsRefinement(long newFromSnapshotId, long newToSnapshotId) {
     Set<Long> snapshotIdsRange = Sets.newHashSet(
-        SnapshotUtil.snapshotIdsBetween(table(), fromSnapshotId, toSnapshotId));
+        SnapshotUtil.snapshotIdsBetween(table(), context().fromSnapshotId(), context().toSnapshotId()));
     // since snapshotIdsBetween return ids in range (fromSnapshotId, toSnapshotId]
-    snapshotIdsRange.add(fromSnapshotId);
+    snapshotIdsRange.add(context().fromSnapshotId());
     Preconditions.checkArgument(
         snapshotIdsRange.contains(newFromSnapshotId),
         "from snapshot id %s not in existing snapshot ids range (%s, %s]",
-        newFromSnapshotId, fromSnapshotId, newToSnapshotId);
+        newFromSnapshotId, context().fromSnapshotId(), newToSnapshotId);
     Preconditions.checkArgument(
         snapshotIdsRange.contains(newToSnapshotId),
         "to snapshot id %s not in existing snapshot ids range (%s, %s]",
-        newToSnapshotId, fromSnapshotId, toSnapshotId);
+        newToSnapshotId, context().fromSnapshotId(), context().toSnapshotId());
   }
 
   private static void validateSnapshotIds(Table table, long fromSnapshotId, long toSnapshotId) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 520f113..e4288c0 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,14 +19,12 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 
@@ -77,23 +75,14 @@ public class ManifestEntriesTable extends BaseMetadataTable {
       super(ops, table, schema);
     }
 
-    private EntriesTableScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      super(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    private EntriesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      super(ops, table, schema, context);
     }
 
     @Override
-    protected TableScan newRefinedScan(
-        TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-        ImmutableMap<String, String> options) {
-      return new EntriesTableScan(
-          ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-          caseSensitive, colStats, selectedColumns, options);
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema,
+                                       TableScanContext context) {
+      return new EntriesTableScan(ops, table, schema, context);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/StaticTableScan.java b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
index 14401ec..ba97574 100644
--- a/core/src/main/java/org/apache/iceberg/StaticTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/StaticTableScan.java
@@ -19,11 +19,9 @@
 
 package org.apache.iceberg;
 
-import java.util.Collection;
 import java.util.function.Function;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
 class StaticTableScan extends BaseTableScan {
   private final Function<StaticTableScan, DataTask> buildTask;
@@ -33,13 +31,9 @@ class StaticTableScan extends BaseTableScan {
     this.buildTask = buildTask;
   }
 
-  private StaticTableScan(
-      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean ignoreResiduals, boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
-      Function<StaticTableScan, DataTask> buildTask, ImmutableMap<String, String> options) {
-    super(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, options);
+  private StaticTableScan(TableOperations ops, Table table, Schema schema,
+                          Function<StaticTableScan, DataTask> buildTask, TableScanContext context) {
+    super(ops, table, schema, context);
     this.buildTask = buildTask;
   }
 
@@ -50,13 +44,9 @@ class StaticTableScan extends BaseTableScan {
   }
 
   @Override
-  protected TableScan newRefinedScan(
-      TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-      boolean ignoreResiduals, boolean caseSensitive, boolean colStats,
-      Collection<String> selectedColumns, ImmutableMap<String, String> options) {
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
     return new StaticTableScan(
-        ops, table, snapshotId, schema, rowFilter, ignoreResiduals,
-        caseSensitive, colStats, selectedColumns, buildTask, options);
+        ops, table, schema, buildTask, context);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java
new file mode 100644
index 0000000..f4477fa
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+/**
+ * Context object with optional arguments for a TableScan.
+ */
+final class TableScanContext {
+  private final Long snapshotId;
+  private final Expression rowFilter;
+  private final boolean ignoreResiduals;
+  private final boolean caseSensitive;
+  private final boolean colStats;
+  private final Collection<String> selectedColumns;
+  private final ImmutableMap<String, String> options;
+  private final Long fromSnapshotId;
+  private final Long toSnapshotId;
+
+  TableScanContext() {
+    this.snapshotId = null;
+    this.rowFilter = Expressions.alwaysTrue();
+    this.ignoreResiduals = false;
+    this.caseSensitive = true;
+    this.colStats = false;
+    this.selectedColumns = null;
+    this.options = ImmutableMap.of();
+    this.fromSnapshotId = null;
+    this.toSnapshotId = null;
+  }
+
+  private TableScanContext(Long snapshotId, Expression rowFilter, boolean ignoreResiduals,
+                           boolean caseSensitive, boolean colStats, Collection<String> selectedColumns,
+                           ImmutableMap<String, String> options, Long fromSnapshotId, Long toSnapshotId) {
+    this.snapshotId = snapshotId;
+    this.rowFilter = rowFilter;
+    this.ignoreResiduals = ignoreResiduals;
+    this.caseSensitive = caseSensitive;
+    this.colStats = colStats;
+    this.selectedColumns = selectedColumns;
+    this.options = options;
+    this.fromSnapshotId = fromSnapshotId;
+    this.toSnapshotId = toSnapshotId;
+  }
+
+  Long snapshotId() {
+    return snapshotId;
+  }
+
+  TableScanContext useSnapshotId(Long scanSnapshotId) {
+    return new TableScanContext(scanSnapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  Expression rowFilter() {
+    return rowFilter;
+  }
+
+  TableScanContext filterRows(Expression filter) {
+    return new TableScanContext(snapshotId, filter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  boolean ignoreResiduals() {
+    return ignoreResiduals;
+  }
+
+  TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
+    return new TableScanContext(snapshotId, rowFilter, shouldIgnoreResiduals,
+        caseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  boolean caseSensitive() {
+    return caseSensitive;
+  }
+
+  TableScanContext setCaseSensitive(boolean isCaseSensitive) {
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        isCaseSensitive, colStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  boolean returnColumnStats() {
+    return colStats;
+  }
+
+  TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, returnColumnStats, selectedColumns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  Collection<String> selectedColumns() {
+    return selectedColumns;
+  }
+
+  TableScanContext selectColumns(Collection<String> columns) {
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, columns, options, fromSnapshotId, toSnapshotId);
+  }
+
+  Map<String, String> options() {
+    return options;
+  }
+
+  TableScanContext withOption(String property, String value) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.putAll(options);
+    builder.put(property, value);
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, builder.build(), fromSnapshotId, toSnapshotId);
+  }
+
+  Long fromSnapshotId() {
+    return fromSnapshotId;
+  }
+
+  TableScanContext fromSnapshotId(long id) {
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options, id, toSnapshotId);
+  }
+
+  Long toSnapshotId() {
+    return toSnapshotId;
+  }
+
+  TableScanContext toSnapshotId(long id) {
+    return new TableScanContext(snapshotId, rowFilter, ignoreResiduals,
+        caseSensitive, colStats, selectedColumns, options, fromSnapshotId, id);
+  }
+}