You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/27 03:44:27 UTC

[iceberg] branch master updated: Core: Make TableScanContext immutable (#5985)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 882459d488 Core: Make TableScanContext immutable (#5985)
882459d488 is described below

commit 882459d488a3fae73eda5b1f09f18d1af9fe6f51
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Thu Apr 27 05:44:20 2023 +0200

    Core: Make TableScanContext immutable (#5985)
    
    Co-authored-by: Liwei Li <hi...@gmail.com>
---
 .../iceberg/BaseIncrementalChangelogScan.java      |   2 +-
 .../org/apache/iceberg/BaseMetadataTableScan.java  |   2 +-
 .../main/java/org/apache/iceberg/BaseTable.java    |   5 +-
 .../org/apache/iceberg/PositionDeletesTable.java   |   2 +-
 .../java/org/apache/iceberg/TableScanContext.java  | 382 +++++----------------
 .../iceberg/TestScanPlanningAndReporting.java      |   2 +-
 6 files changed, 98 insertions(+), 297 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
index c2e39baefa..b644b2d398 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
@@ -39,7 +39,7 @@ class BaseIncrementalChangelogScan
     implements IncrementalChangelogScan {
 
   BaseIncrementalChangelogScan(Table table) {
-    this(table, table.schema(), new TableScanContext());
+    this(table, table.schema(), TableScanContext.empty());
   }
 
   private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContext context) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
index 5fd211f9c1..49c05e8298 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
@@ -25,7 +25,7 @@ abstract class BaseMetadataTableScan extends BaseTableScan {
   private final MetadataTableType tableType;
 
   protected BaseMetadataTableScan(Table table, Schema schema, MetadataTableType tableType) {
-    super(table, schema, new TableScanContext());
+    super(table, schema, TableScanContext.empty());
     this.tableType = tableType;
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index bb2e534ae7..b9ed4f8d67 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -70,13 +70,14 @@ public class BaseTable implements Table, HasTableOperations, Serializable {
 
   @Override
   public TableScan newScan() {
-    return new DataTableScan(this, schema(), new TableScanContext().reportWith(reporter));
+    return new DataTableScan(
+        this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
   }
 
   @Override
   public IncrementalAppendScan newIncrementalAppendScan() {
     return new BaseIncrementalAppendScan(
-        this, schema(), new TableScanContext().reportWith(reporter));
+        this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 1983e0ddfc..39b43cc413 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -131,7 +131,7 @@ public class PositionDeletesTable extends BaseMetadataTable {
       extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan {
 
     protected PositionDeletesBatchScan(Table table, Schema schema) {
-      super(table, schema, new TableScanContext());
+      super(table, schema, TableScanContext.empty());
     }
 
     protected PositionDeletesBatchScan(Table table, Schema schema, TableScanContext context) {
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index e12c2a8fc4..87a2f59f6c 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -20,8 +20,8 @@ package org.apache.iceberg;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.metrics.LoggingMetricsReporter;
@@ -30,349 +30,149 @@ import org.apache.iceberg.metrics.MetricsReporters;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.util.ThreadPools;
+import org.immutables.value.Value;
 
 /** Context object with optional arguments for a TableScan. */
-final class TableScanContext {
-  private final Long snapshotId;
-  private final Expression rowFilter;
-  private final boolean ignoreResiduals;
-  private final boolean caseSensitive;
-  private final boolean colStats;
-  private final Schema projectedSchema;
-  private final Collection<String> selectedColumns;
-  private final ImmutableMap<String, String> options;
-  private final Long fromSnapshotId;
-  private final Long toSnapshotId;
-  private final ExecutorService planExecutor;
-  private final boolean fromSnapshotInclusive;
-  private final MetricsReporter metricsReporter;
+@Value.Immutable
+abstract class TableScanContext {
 
-  TableScanContext() {
-    this.snapshotId = null;
-    this.rowFilter = Expressions.alwaysTrue();
-    this.ignoreResiduals = false;
-    this.caseSensitive = true;
-    this.colStats = false;
-    this.projectedSchema = null;
-    this.selectedColumns = null;
-    this.options = ImmutableMap.of();
-    this.fromSnapshotId = null;
-    this.toSnapshotId = null;
-    this.planExecutor = null;
-    this.fromSnapshotInclusive = false;
-    this.metricsReporter = null;
-  }
+  @Nullable
+  public abstract Long snapshotId();
 
-  private TableScanContext(
-      Long snapshotId,
-      Expression rowFilter,
-      boolean ignoreResiduals,
-      boolean caseSensitive,
-      boolean colStats,
-      Schema projectedSchema,
-      Collection<String> selectedColumns,
-      ImmutableMap<String, String> options,
-      Long fromSnapshotId,
-      Long toSnapshotId,
-      ExecutorService planExecutor,
-      boolean fromSnapshotInclusive,
-      MetricsReporter metricsReporter) {
-    this.snapshotId = snapshotId;
-    this.rowFilter = rowFilter;
-    this.ignoreResiduals = ignoreResiduals;
-    this.caseSensitive = caseSensitive;
-    this.colStats = colStats;
-    this.projectedSchema = projectedSchema;
-    this.selectedColumns = selectedColumns;
-    this.options = options;
-    this.fromSnapshotId = fromSnapshotId;
-    this.toSnapshotId = toSnapshotId;
-    this.planExecutor = planExecutor;
-    this.fromSnapshotInclusive = fromSnapshotInclusive;
-    this.metricsReporter = metricsReporter;
+  @Value.Default
+  public Expression rowFilter() {
+    return Expressions.alwaysTrue();
   }
 
-  Long snapshotId() {
-    return snapshotId;
+  @Value.Default
+  public boolean ignoreResiduals() {
+    return false;
   }
 
-  TableScanContext useSnapshotId(Long scanSnapshotId) {
-    return new TableScanContext(
-        scanSnapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  @Value.Default
+  public boolean caseSensitive() {
+    return true;
   }
 
-  Expression rowFilter() {
-    return rowFilter;
+  @Value.Default
+  public boolean returnColumnStats() {
+    return false;
   }
 
-  TableScanContext filterRows(Expression filter) {
-    return new TableScanContext(
-        snapshotId,
-        filter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  @Nullable
+  public abstract Collection<String> selectedColumns();
+
+  @Nullable
+  public abstract Schema projectedSchema();
+
+  @Value.Default
+  public Map<String, String> options() {
+    return ImmutableMap.of();
   }
 
-  boolean ignoreResiduals() {
-    return ignoreResiduals;
+  @Nullable
+  public abstract Long fromSnapshotId();
+
+  @Value.Default
+  public boolean fromSnapshotInclusive() {
+    return false;
   }
 
-  TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        shouldIgnoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  @Nullable
+  public abstract Long toSnapshotId();
+
+  @Value.Default
+  public ExecutorService planExecutor() {
+    return ThreadPools.getWorkerPool();
   }
 
-  boolean caseSensitive() {
-    return caseSensitive;
+  @Value.Derived
+  boolean planWithCustomizedExecutor() {
+    return !planExecutor().equals(ThreadPools.getWorkerPool());
   }
 
-  TableScanContext setCaseSensitive(boolean isCaseSensitive) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        isCaseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  @Value.Default
+  public MetricsReporter metricsReporter() {
+    return LoggingMetricsReporter.instance();
   }
 
-  boolean returnColumnStats() {
-    return colStats;
+  TableScanContext useSnapshotId(Long scanSnapshotId) {
+    return ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
   }
 
-  TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        returnColumnStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  TableScanContext filterRows(Expression filter) {
+    return ImmutableTableScanContext.builder().from(this).rowFilter(filter).build();
   }
 
-  Collection<String> selectedColumns() {
-    return selectedColumns;
+  TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
+    return ImmutableTableScanContext.builder()
+        .from(this)
+        .ignoreResiduals(shouldIgnoreResiduals)
+        .build();
   }
 
-  TableScanContext selectColumns(Collection<String> columns) {
-    Preconditions.checkState(
-        projectedSchema == null, "Cannot select columns when projection schema is set");
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        null,
-        columns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+  TableScanContext setCaseSensitive(boolean isCaseSensitive) {
+    return ImmutableTableScanContext.builder().from(this).caseSensitive(isCaseSensitive).build();
   }
 
-  Schema projectedSchema() {
-    return projectedSchema;
+  TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
+    return ImmutableTableScanContext.builder()
+        .from(this)
+        .returnColumnStats(returnColumnStats)
+        .build();
   }
 
-  TableScanContext project(Schema schema) {
+  TableScanContext selectColumns(Collection<String> columns) {
     Preconditions.checkState(
-        selectedColumns == null, "Cannot set projection schema when columns are selected");
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        schema,
-        null,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
+        projectedSchema() == null, "Cannot select columns when projection schema is set");
+    return ImmutableTableScanContext.builder().from(this).selectedColumns(columns).build();
   }
 
-  Map<String, String> options() {
-    return options;
+  TableScanContext project(Schema schema) {
+    Preconditions.checkState(
+        selectedColumns() == null, "Cannot set projection schema when columns are selected");
+    return ImmutableTableScanContext.builder().from(this).projectedSchema(schema).build();
   }
 
   TableScanContext withOption(String property, String value) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    builder.putAll(options);
-    builder.put(property, value);
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        builder.build(),
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
-  }
-
-  Long fromSnapshotId() {
-    return fromSnapshotId;
+    return ImmutableTableScanContext.builder().from(this).putOptions(property, value).build();
   }
 
   TableScanContext fromSnapshotIdExclusive(long id) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        id,
-        toSnapshotId,
-        planExecutor,
-        false,
-        metricsReporter);
+    return ImmutableTableScanContext.builder()
+        .from(this)
+        .fromSnapshotId(id)
+        .fromSnapshotInclusive(false)
+        .build();
   }
 
   TableScanContext fromSnapshotIdInclusive(long id) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        id,
-        toSnapshotId,
-        planExecutor,
-        true,
-        metricsReporter);
-  }
-
-  boolean fromSnapshotInclusive() {
-    return fromSnapshotInclusive;
-  }
-
-  Long toSnapshotId() {
-    return toSnapshotId;
+    return ImmutableTableScanContext.builder()
+        .from(this)
+        .fromSnapshotId(id)
+        .fromSnapshotInclusive(true)
+        .build();
   }
 
   TableScanContext toSnapshotId(long id) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        id,
-        planExecutor,
-        fromSnapshotInclusive,
-        metricsReporter);
-  }
-
-  ExecutorService planExecutor() {
-    return Optional.ofNullable(planExecutor).orElseGet(ThreadPools::getWorkerPool);
-  }
-
-  boolean planWithCustomizedExecutor() {
-    return planExecutor != null;
+    return ImmutableTableScanContext.builder().from(this).toSnapshotId(id).build();
   }
 
   TableScanContext planWith(ExecutorService executor) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        executor,
-        fromSnapshotInclusive,
-        metricsReporter);
+    return ImmutableTableScanContext.builder().from(this).planExecutor(executor).build();
   }
 
-  MetricsReporter metricsReporter() {
-    return null == metricsReporter ? LoggingMetricsReporter.instance() : metricsReporter;
+  TableScanContext reportWith(MetricsReporter reporter) {
+    return ImmutableTableScanContext.builder()
+        .from(this)
+        .metricsReporter(
+            metricsReporter() instanceof LoggingMetricsReporter
+                ? reporter
+                : MetricsReporters.combine(metricsReporter(), reporter))
+        .build();
   }
 
-  TableScanContext reportWith(MetricsReporter reporter) {
-    return new TableScanContext(
-        snapshotId,
-        rowFilter,
-        ignoreResiduals,
-        caseSensitive,
-        colStats,
-        projectedSchema,
-        selectedColumns,
-        options,
-        fromSnapshotId,
-        toSnapshotId,
-        planExecutor,
-        fromSnapshotInclusive,
-        MetricsReporters.combine(metricsReporter, reporter));
+  public static TableScanContext empty() {
+    return ImmutableTableScanContext.builder().build();
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index dfc14faa0b..8dbdd9cf6b 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -46,7 +46,7 @@ public class TestScanPlanningAndReporting extends TableTestBase {
 
   @Test
   public void noDuplicatesInScanContext() {
-    TableScanContext context = new TableScanContext();
+    TableScanContext context = TableScanContext.empty();
     assertThat(context.metricsReporter()).isInstanceOf(LoggingMetricsReporter.class);
 
     MetricsReporter first = report -> {};