You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/20 16:38:12 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6365: Core: Add position deletes metadata table

aokolnychyi commented on code in PR #6365:
URL: https://github.com/apache/iceberg/pull/6365#discussion_r1053426484


##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;

Review Comment:
   nit: What about an empty line between static and instance variables? I'd also consider adding one before static vars since the class definition is split between multiple lines. Up to you, though, I see it was copied from another place.



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {
+    return context().options();
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
+  @Override
+  public Table table() {
+    return super.table();
+  }
+
+  public ThisT useSnapshot(long scanSnapshotId) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+    Preconditions.checkArgument(
+        tableOps().current().snapshot(scanSnapshotId) != null,
+        "Cannot find snapshot with ID %s",
+        scanSnapshotId);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
+  }
+
+  public ThisT useRef(String name) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
+    Snapshot snapshot = table().snapshot(name);
+    Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
+    return newRefinedScan(

Review Comment:
   nit: Same here.



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {

Review Comment:
   Seems like something that may be handy in other scans too? If so, what about moving to `BaseScan`?



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>

Review Comment:
   Does `C` represent a particular name? I think we call it `G` in `Scan`, short from groups.



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {
+    return context().options();
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
+  @Override
+  public Table table() {
+    return super.table();
+  }
+
+  public ThisT useSnapshot(long scanSnapshotId) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+    Preconditions.checkArgument(
+        tableOps().current().snapshot(scanSnapshotId) != null,
+        "Cannot find snapshot with ID %s",
+        scanSnapshotId);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));

Review Comment:
   nit: One of the things we did in `BaseIncrementalScan` is to create a temp var for the new context to avoid splitting this call into multiple lines.
   
   ```
   TableScanContext newContext = context().useSnapshotId(scanSnapshotId);
   return newRefinedScan(tableOps(), table(), tableSchema(), newContext);
   ```



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {
+    return context().options();
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
+  @Override
+  public Table table() {
+    return super.table();
+  }
+
+  public ThisT useSnapshot(long scanSnapshotId) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+    Preconditions.checkArgument(
+        tableOps().current().snapshot(scanSnapshotId) != null,
+        "Cannot find snapshot with ID %s",
+        scanSnapshotId);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
+  }
+
+  public ThisT useRef(String name) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
+    Snapshot snapshot = table().snapshot(name);
+    Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
+  }
+
+  public ThisT asOfTime(long timestampMillis) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+
+    return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    Snapshot snapshot = snapshot();
+    if (snapshot != null) {
+      LOG.info(
+          "Scanning table {} snapshot {} created at {} with filter {}",
+          table(),
+          snapshot.snapshotId(),
+          DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
+          ExpressionUtil.toSanitizedString(filter()));
+
+      Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema()));
+      List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
+      List<String> projectedFieldNames =
+          projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());
+
+      Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();
+
+      return CloseableIterable.whenComplete(
+          doPlanFiles(),
+          () -> {
+            planningDuration.stop();
+            Map<String, String> metadata = Maps.newHashMap(context().options());
+            metadata.putAll(EnvironmentContext.get());
+            ScanReport scanReport =
+                ImmutableScanReport.builder()
+                    .schemaId(schema().schemaId())
+                    .projectedFieldIds(projectedFieldIds)
+                    .projectedFieldNames(projectedFieldNames)
+                    .tableName(table().name())
+                    .snapshotId(snapshot.snapshotId())
+                    .filter(ExpressionUtil.sanitize(filter()))
+                    .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
+                    .metadata(metadata)
+                    .build();
+            context().metricsReporter().report(scanReport);
+          });
+    } else {

Review Comment:
   I know it was copied but I think having a shortcut first would be more obvious and may avoid one level of indentation. Up to you.
   
   ```
   Snapshot snapshot = snapshot();
   
   if (snapshot == null) {
     LOG.info("Scanning empty table {}", table());
     return CloseableIterable.empty();
   }
   
   ...
   ```
   
   



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>

Review Comment:
   Yeah, a short doc explaining this one is to share code between `TableScan` and `BatchScan` implementations would be nice.



##########
core/src/main/java/org/apache/iceberg/BaseTableScan.java:
##########
@@ -18,32 +18,14 @@
  */
 package org.apache.iceberg;
 
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.iceberg.events.Listeners;
-import org.apache.iceberg.events.ScanEvent;
-import org.apache.iceberg.expressions.ExpressionUtil;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.metrics.DefaultMetricsContext;
-import org.apache.iceberg.metrics.ImmutableScanReport;
 import org.apache.iceberg.metrics.ScanMetrics;
-import org.apache.iceberg.metrics.ScanMetricsResult;
-import org.apache.iceberg.metrics.ScanReport;
-import org.apache.iceberg.metrics.Timer;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.DateTimeUtil;
-import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Base class for {@link TableScan} implementations. */
-abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, CombinedScanTask>
+abstract class BaseTableScan extends AbstractTableScan<TableScan, FileScanTask, CombinedScanTask>
     implements TableScan {
   private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);

Review Comment:
   Are these still in use?



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -107,8 +111,15 @@ private MetadataColumns() {}
           ROW_POSITION.fieldId(),
           IS_DELETED.fieldId(),
           SPEC_ID.fieldId(),
-          PARTITION_COLUMN_ID);
+          PARTITION_COLUMN_ID,
+          POSITION_DELETE_TABLE_PARTITION_FIELD_ID,

Review Comment:
   Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to `META_COLUMNS`. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.
   
   Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>

Review Comment:
   Seems like the logic here mostly revolves around a snapshot. We could call it `SnapshotScan`. It isn't perfect because there are children that scan across all snapshots. However, it is a little bit more specific and avoids the confusion of abstract vs base.



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {
+    return context().options();
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
+  @Override
+  public Table table() {
+    return super.table();
+  }
+
+  public ThisT useSnapshot(long scanSnapshotId) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+    Preconditions.checkArgument(
+        tableOps().current().snapshot(scanSnapshotId) != null,
+        "Cannot find snapshot with ID %s",
+        scanSnapshotId);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
+  }
+
+  public ThisT useRef(String name) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
+    Snapshot snapshot = table().snapshot(name);
+    Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
+  }
+
+  public ThisT asOfTime(long timestampMillis) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
+
+    return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    Snapshot snapshot = snapshot();
+    if (snapshot != null) {
+      LOG.info(
+          "Scanning table {} snapshot {} created at {} with filter {}",
+          table(),
+          snapshot.snapshotId(),
+          DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
+          ExpressionUtil.toSanitizedString(filter()));
+
+      Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema()));
+      List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
+      List<String> projectedFieldNames =
+          projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());
+
+      Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();
+
+      return CloseableIterable.whenComplete(
+          doPlanFiles(),
+          () -> {
+            planningDuration.stop();
+            Map<String, String> metadata = Maps.newHashMap(context().options());
+            metadata.putAll(EnvironmentContext.get());
+            ScanReport scanReport =
+                ImmutableScanReport.builder()
+                    .schemaId(schema().schemaId())
+                    .projectedFieldIds(projectedFieldIds)
+                    .projectedFieldNames(projectedFieldNames)
+                    .tableName(table().name())
+                    .snapshotId(snapshot.snapshotId())
+                    .filter(ExpressionUtil.sanitize(filter()))
+                    .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
+                    .metadata(metadata)
+                    .build();
+            context().metricsReporter().report(scanReport);
+          });
+    } else {
+      LOG.info("Scanning empty table {}", table());
+      return CloseableIterable.empty();
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public CloseableIterable<C> planTasks() {

Review Comment:
   I don't think this cast is generally safe as we don't know the boundary and `planTaskGroups` would always create a specific type of a task group. I'd remove this from here and implement this method in the position deletes table scan itself. If we have more use cases later, we can add a common class for all batch scans, just like we have one for all table scans now. That way, you also skip the cast completely.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
+import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
+
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PartitionUtil;
+
+public class PositionDeletesTable extends BaseTable {
+
+  private final Table table;
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table.name() + ".position_deletes");
+    this.table = table;
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, name);
+    this.table = table;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public TableScan newScan() {
+    throw new UnsupportedOperationException(
+        "Cannot create TableScan from table of type POSITION_DELETES");
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return PositionDeletesTable.schema(table(), Partitioning.partitionType(table()));
+  }
+
+  public static Schema schema(Table table, Types.StructType partitionType) {
+    Schema result =
+        new Schema(
+            MetadataColumns.DELETE_FILE_PATH,
+            MetadataColumns.DELETE_FILE_POS,
+            Types.NestedField.optional(
+                MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
+                "row",
+                table.schema().asStruct(),
+                MetadataColumns.DELETE_FILE_ROW_DOC),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
+                "partition",
+                partitionType,
+                "Partition that position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_SPEC_ID,
+                "spec_id",
+                Types.IntegerType.get(),
+                "Spec ID of the file that the position delete row belongs to"),
+            Types.NestedField.required(
+                POSITION_DELETE_TABLE_FILE_PATH,
+                "delete_file_path",
+                Types.StringType.get(),
+                "Spec ID of the file that the position delete row belongs to"));
+
+    if (partitionType.fields().size() > 0) {
+      return result;
+    } else {
+      // avoid returning an empty struct, which is not always supported. instead, drop the partition
+      // field
+      return TypeUtil.selectNot(result, Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
+    }
+  }
+
+  public static class PositionDeletesTableScan
+      extends AbstractTableScan<
+          BatchScan, org.apache.iceberg.ScanTask, ScanTaskGroup<org.apache.iceberg.ScanTask>>

Review Comment:
   Can we give `ScanTask` in this class a more specific name and then avoid this qualified import?



##########
core/src/main/java/org/apache/iceberg/AbstractTableScan.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.events.ScanEvent;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.DefaultMetricsContext;
+import org.apache.iceberg.metrics.ImmutableScanReport;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsResult;
+import org.apache.iceberg.metrics.ScanReport;
+import org.apache.iceberg.metrics.Timer;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractTableScan<ThisT, T extends ScanTask, C extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, C> {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScan.class);
+  private ScanMetrics scanMetrics;
+
+  protected AbstractTableScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected Long snapshotId() {
+    return context().snapshotId();
+  }
+
+  protected Map<String, String> options() {
+    return context().options();
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles();
+
+  protected ScanMetrics scanMetrics() {
+    if (scanMetrics == null) {
+      this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
+    }
+
+    return scanMetrics;
+  }
+
+  @Override
+  public Table table() {

Review Comment:
   Can we simply make the parent method public in `BaseScan`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org