You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/27 03:44:27 UTC
[iceberg] branch master updated: Core: Make TableScanContext immutable (#5985)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 882459d488 Core: Make TableScanContext immutable (#5985)
882459d488 is described below
commit 882459d488a3fae73eda5b1f09f18d1af9fe6f51
Author: Eduard Tudenhoefner <et...@gmail.com>
AuthorDate: Thu Apr 27 05:44:20 2023 +0200
Core: Make TableScanContext immutable (#5985)
Co-authored-by: Liwei Li <hi...@gmail.com>
---
.../iceberg/BaseIncrementalChangelogScan.java | 2 +-
.../org/apache/iceberg/BaseMetadataTableScan.java | 2 +-
.../main/java/org/apache/iceberg/BaseTable.java | 5 +-
.../org/apache/iceberg/PositionDeletesTable.java | 2 +-
.../java/org/apache/iceberg/TableScanContext.java | 382 +++++----------------
.../iceberg/TestScanPlanningAndReporting.java | 2 +-
6 files changed, 98 insertions(+), 297 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
index c2e39baefa..b644b2d398 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
@@ -39,7 +39,7 @@ class BaseIncrementalChangelogScan
implements IncrementalChangelogScan {
BaseIncrementalChangelogScan(Table table) {
- this(table, table.schema(), new TableScanContext());
+ this(table, table.schema(), TableScanContext.empty());
}
private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContext context) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
index 5fd211f9c1..49c05e8298 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTableScan.java
@@ -25,7 +25,7 @@ abstract class BaseMetadataTableScan extends BaseTableScan {
private final MetadataTableType tableType;
protected BaseMetadataTableScan(Table table, Schema schema, MetadataTableType tableType) {
- super(table, schema, new TableScanContext());
+ super(table, schema, TableScanContext.empty());
this.tableType = tableType;
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java
index bb2e534ae7..b9ed4f8d67 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -70,13 +70,14 @@ public class BaseTable implements Table, HasTableOperations, Serializable {
@Override
public TableScan newScan() {
- return new DataTableScan(this, schema(), new TableScanContext().reportWith(reporter));
+ return new DataTableScan(
+ this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
}
@Override
public IncrementalAppendScan newIncrementalAppendScan() {
return new BaseIncrementalAppendScan(
- this, schema(), new TableScanContext().reportWith(reporter));
+ this, schema(), ImmutableTableScanContext.builder().metricsReporter(reporter).build());
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
index 1983e0ddfc..39b43cc413 100644
--- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
+++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
@@ -131,7 +131,7 @@ public class PositionDeletesTable extends BaseMetadataTable {
extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan {
protected PositionDeletesBatchScan(Table table, Schema schema) {
- super(table, schema, new TableScanContext());
+ super(table, schema, TableScanContext.empty());
}
protected PositionDeletesBatchScan(Table table, Schema schema, TableScanContext context) {
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index e12c2a8fc4..87a2f59f6c 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -20,8 +20,8 @@ package org.apache.iceberg;
import java.util.Collection;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
@@ -30,349 +30,149 @@ import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.ThreadPools;
+import org.immutables.value.Value;
/** Context object with optional arguments for a TableScan. */
-final class TableScanContext {
- private final Long snapshotId;
- private final Expression rowFilter;
- private final boolean ignoreResiduals;
- private final boolean caseSensitive;
- private final boolean colStats;
- private final Schema projectedSchema;
- private final Collection<String> selectedColumns;
- private final ImmutableMap<String, String> options;
- private final Long fromSnapshotId;
- private final Long toSnapshotId;
- private final ExecutorService planExecutor;
- private final boolean fromSnapshotInclusive;
- private final MetricsReporter metricsReporter;
+@Value.Immutable
+abstract class TableScanContext {
- TableScanContext() {
- this.snapshotId = null;
- this.rowFilter = Expressions.alwaysTrue();
- this.ignoreResiduals = false;
- this.caseSensitive = true;
- this.colStats = false;
- this.projectedSchema = null;
- this.selectedColumns = null;
- this.options = ImmutableMap.of();
- this.fromSnapshotId = null;
- this.toSnapshotId = null;
- this.planExecutor = null;
- this.fromSnapshotInclusive = false;
- this.metricsReporter = null;
- }
+ @Nullable
+ public abstract Long snapshotId();
- private TableScanContext(
- Long snapshotId,
- Expression rowFilter,
- boolean ignoreResiduals,
- boolean caseSensitive,
- boolean colStats,
- Schema projectedSchema,
- Collection<String> selectedColumns,
- ImmutableMap<String, String> options,
- Long fromSnapshotId,
- Long toSnapshotId,
- ExecutorService planExecutor,
- boolean fromSnapshotInclusive,
- MetricsReporter metricsReporter) {
- this.snapshotId = snapshotId;
- this.rowFilter = rowFilter;
- this.ignoreResiduals = ignoreResiduals;
- this.caseSensitive = caseSensitive;
- this.colStats = colStats;
- this.projectedSchema = projectedSchema;
- this.selectedColumns = selectedColumns;
- this.options = options;
- this.fromSnapshotId = fromSnapshotId;
- this.toSnapshotId = toSnapshotId;
- this.planExecutor = planExecutor;
- this.fromSnapshotInclusive = fromSnapshotInclusive;
- this.metricsReporter = metricsReporter;
+ @Value.Default
+ public Expression rowFilter() {
+ return Expressions.alwaysTrue();
}
- Long snapshotId() {
- return snapshotId;
+ @Value.Default
+ public boolean ignoreResiduals() {
+ return false;
}
- TableScanContext useSnapshotId(Long scanSnapshotId) {
- return new TableScanContext(
- scanSnapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ @Value.Default
+ public boolean caseSensitive() {
+ return true;
}
- Expression rowFilter() {
- return rowFilter;
+ @Value.Default
+ public boolean returnColumnStats() {
+ return false;
}
- TableScanContext filterRows(Expression filter) {
- return new TableScanContext(
- snapshotId,
- filter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ @Nullable
+ public abstract Collection<String> selectedColumns();
+
+ @Nullable
+ public abstract Schema projectedSchema();
+
+ @Value.Default
+ public Map<String, String> options() {
+ return ImmutableMap.of();
}
- boolean ignoreResiduals() {
- return ignoreResiduals;
+ @Nullable
+ public abstract Long fromSnapshotId();
+
+ @Value.Default
+ public boolean fromSnapshotInclusive() {
+ return false;
}
- TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- shouldIgnoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ @Nullable
+ public abstract Long toSnapshotId();
+
+ @Value.Default
+ public ExecutorService planExecutor() {
+ return ThreadPools.getWorkerPool();
}
- boolean caseSensitive() {
- return caseSensitive;
+ @Value.Derived
+ boolean planWithCustomizedExecutor() {
+ return !planExecutor().equals(ThreadPools.getWorkerPool());
}
- TableScanContext setCaseSensitive(boolean isCaseSensitive) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- isCaseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ @Value.Default
+ public MetricsReporter metricsReporter() {
+ return LoggingMetricsReporter.instance();
}
- boolean returnColumnStats() {
- return colStats;
+ TableScanContext useSnapshotId(Long scanSnapshotId) {
+ return ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
}
- TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- returnColumnStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ TableScanContext filterRows(Expression filter) {
+ return ImmutableTableScanContext.builder().from(this).rowFilter(filter).build();
}
- Collection<String> selectedColumns() {
- return selectedColumns;
+ TableScanContext ignoreResiduals(boolean shouldIgnoreResiduals) {
+ return ImmutableTableScanContext.builder()
+ .from(this)
+ .ignoreResiduals(shouldIgnoreResiduals)
+ .build();
}
- TableScanContext selectColumns(Collection<String> columns) {
- Preconditions.checkState(
- projectedSchema == null, "Cannot select columns when projection schema is set");
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- null,
- columns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ TableScanContext setCaseSensitive(boolean isCaseSensitive) {
+ return ImmutableTableScanContext.builder().from(this).caseSensitive(isCaseSensitive).build();
}
- Schema projectedSchema() {
- return projectedSchema;
+ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
+ return ImmutableTableScanContext.builder()
+ .from(this)
+ .returnColumnStats(returnColumnStats)
+ .build();
}
- TableScanContext project(Schema schema) {
+ TableScanContext selectColumns(Collection<String> columns) {
Preconditions.checkState(
- selectedColumns == null, "Cannot set projection schema when columns are selected");
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- schema,
- null,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
+ projectedSchema() == null, "Cannot select columns when projection schema is set");
+ return ImmutableTableScanContext.builder().from(this).selectedColumns(columns).build();
}
- Map<String, String> options() {
- return options;
+ TableScanContext project(Schema schema) {
+ Preconditions.checkState(
+ selectedColumns() == null, "Cannot set projection schema when columns are selected");
+ return ImmutableTableScanContext.builder().from(this).projectedSchema(schema).build();
}
TableScanContext withOption(String property, String value) {
- ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- builder.putAll(options);
- builder.put(property, value);
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- builder.build(),
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
- }
-
- Long fromSnapshotId() {
- return fromSnapshotId;
+ return ImmutableTableScanContext.builder().from(this).putOptions(property, value).build();
}
TableScanContext fromSnapshotIdExclusive(long id) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- id,
- toSnapshotId,
- planExecutor,
- false,
- metricsReporter);
+ return ImmutableTableScanContext.builder()
+ .from(this)
+ .fromSnapshotId(id)
+ .fromSnapshotInclusive(false)
+ .build();
}
TableScanContext fromSnapshotIdInclusive(long id) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- id,
- toSnapshotId,
- planExecutor,
- true,
- metricsReporter);
- }
-
- boolean fromSnapshotInclusive() {
- return fromSnapshotInclusive;
- }
-
- Long toSnapshotId() {
- return toSnapshotId;
+ return ImmutableTableScanContext.builder()
+ .from(this)
+ .fromSnapshotId(id)
+ .fromSnapshotInclusive(true)
+ .build();
}
TableScanContext toSnapshotId(long id) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- id,
- planExecutor,
- fromSnapshotInclusive,
- metricsReporter);
- }
-
- ExecutorService planExecutor() {
- return Optional.ofNullable(planExecutor).orElseGet(ThreadPools::getWorkerPool);
- }
-
- boolean planWithCustomizedExecutor() {
- return planExecutor != null;
+ return ImmutableTableScanContext.builder().from(this).toSnapshotId(id).build();
}
TableScanContext planWith(ExecutorService executor) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- executor,
- fromSnapshotInclusive,
- metricsReporter);
+ return ImmutableTableScanContext.builder().from(this).planExecutor(executor).build();
}
- MetricsReporter metricsReporter() {
- return null == metricsReporter ? LoggingMetricsReporter.instance() : metricsReporter;
+ TableScanContext reportWith(MetricsReporter reporter) {
+ return ImmutableTableScanContext.builder()
+ .from(this)
+ .metricsReporter(
+ metricsReporter() instanceof LoggingMetricsReporter
+ ? reporter
+ : MetricsReporters.combine(metricsReporter(), reporter))
+ .build();
}
- TableScanContext reportWith(MetricsReporter reporter) {
- return new TableScanContext(
- snapshotId,
- rowFilter,
- ignoreResiduals,
- caseSensitive,
- colStats,
- projectedSchema,
- selectedColumns,
- options,
- fromSnapshotId,
- toSnapshotId,
- planExecutor,
- fromSnapshotInclusive,
- MetricsReporters.combine(metricsReporter, reporter));
+ public static TableScanContext empty() {
+ return ImmutableTableScanContext.builder().build();
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
index dfc14faa0b..8dbdd9cf6b 100644
--- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java
@@ -46,7 +46,7 @@ public class TestScanPlanningAndReporting extends TableTestBase {
@Test
public void noDuplicatesInScanContext() {
- TableScanContext context = new TableScanContext();
+ TableScanContext context = TableScanContext.empty();
assertThat(context.metricsReporter()).isInstanceOf(LoggingMetricsReporter.class);
MetricsReporter first = report -> {};