You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/02/01 05:18:13 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request, #6716: Spark 3.3: Implement Position Deletes Table

szehon-ho opened a new pull request, #6716:
URL: https://github.com/apache/iceberg/pull/6716

   This is the Spark-side changes for:  https://github.com/apache/iceberg/commits/master
   
   Some explanations:
   - Because RowReader is instantiated with PositionDeletesTable, but we need the base table's schema, need to expose some API's that expose the base table.
   - ORC and Parquet readers fixed to handle constant column pushdown.  (Previously this code path not exercised because metadata columns are not pushed down by Spark).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1109113796


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1101965485


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   It is kind of questionable, I guess. On one hand, the position deletes table schema has data columns and delete files use correct spec IDs. On the other hand, it is still a bit different compared to base tables. Let me think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094981997


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   This may not be enough. We may need to check `spec.schema()`. But the overall idea seems still valid. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1103198268


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>

Review Comment:
   I agree though, done.



##########
core/src/main/java/org/apache/iceberg/SerializableTable.java:
##########
@@ -116,7 +116,7 @@ private FileIO fileIO(Table table) {
     return table.io();
   }
 
-  private Table lazyTable() {
+  protected Table lazyTable() {

Review Comment:
   Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1101959806


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   Well, my original idea was that the position deletes table is actually partitioned in the same way as the main table as data and delete files share the same spec IDs. Let me think for a bit if that's really the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1103197152


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+    return newIterable(
+            inputFile,
+            task.file().format(),
+            task.start(),
+            task.length(),
+            task.residual(),
+            expectedSchema(),
+            idToConstant)
+        .iterator();
+  }
+
+  protected Map<Integer, ?> constantsMap(

Review Comment:
   Yes, changed



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(

Review Comment:
   Removed check



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094106234


##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java:
##########
@@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter {
 
   private final Schema schema;
   private final Expression expr;
+  private final Set<Integer> constantFieldIds;
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
-    this(schema, unbound, true);
+    this(schema, unbound, true, ImmutableSet.of());

Review Comment:
   Without this, any column whose value count are not present go to ROWS_CANNOT_MATCH.  Thus any deletes table constant column filter at all will lead to 0 results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094108930


##########
orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java:
##########
@@ -84,15 +91,18 @@ public CloseableIterator<T> iterator() {
     addCloseable(orcFileReader);
 
     TypeDescription fileSchema = orcFileReader.getSchema();
+    Schema schemaWithoutConstantFields =

Review Comment:
   Previously, schema already pruned without constant columns is passed into ORCIterable for its filter logic.
   
   However, this fails here when trying to bind the constant column filters to it (as they are not in the schema).  This makes it so that we prune out constant columns only when we need it, but we keep original schema for the binding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1093922634


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTable;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Preconditions.checkArgument(
+        deletesTable instanceof MetadataTable,
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Preconditions.checkArgument(
+        ((MetadataTable) deletesTable).type().equals(MetadataTableType.POSITION_DELETES),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Table baseTable = ((MetadataTable) deletesTable).baseTable();

Review Comment:
   RowReader takes the PositionDeletesTable, but we actually need it's base table's schema to calculate the partition type here. 
   
   Hence, added a "metadataTable" interface implemented by both Serializable and non-Serializable Position Delete table to get this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1103198460


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {

Review Comment:
   Good point, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1108041172


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +42,18 @@
 public class PositionDeletesTable extends BaseMetadataTable {

Review Comment:
   This looks good to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #6716:
URL: https://github.com/apache/iceberg/pull/6716


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1102154369


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>

Review Comment:
   We should change EqualityDeleteRowReader too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1092774992


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Table baseTable;
+    if (deletesTable instanceof SerializableTable.SerializableMetadataTable) {

Review Comment:
   Because RowReader's table is PositionDeletesTable, we need the base table's schema to calculate the partition type here.  There was no easy API to get this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1092774992


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Table baseTable;
+    if (deletesTable instanceof SerializableTable.SerializableMetadataTable) {

Review Comment:
   Because RowReader's table is PositionDeletesTable, we need the base table's schema to calculate the partition type here.  There was no easy API to get this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094106234


##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java:
##########
@@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter {
 
   private final Schema schema;
   private final Expression expr;
+  private final Set<Integer> constantFieldIds;
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
-    this(schema, unbound, true);
+    this(schema, unbound, true, ImmutableSet.of());

Review Comment:
   Without the extra check, any column whose stats are null go to ROWS_CANNOT_MATCH.  Thus any deletes table constant column filter at all will lead to 0 results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1108041584


##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -83,6 +84,24 @@ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spe
     return builder.build();
   }
 
+  /**
+   * This method transforms the given partition specs to specs that are used to rewrite the
+   * user-provided filter expression against the given metadata table.
+   *
+   * <p>See: {@link #transformSpec(Schema, PartitionSpec)}
+   *
+   * @param metadataTableSchema schema of the metadata table
+   * @param specs specs on which the metadata table schema is based
+   * @return specs used to rewrite the metadata table filters to partition filters using an
+   *     inclusive projection
+   */
+  static Map<Integer, PartitionSpec> transformSpecs(

Review Comment:
   Looks good too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1105117600


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {
-    super(table, table.name() + ".position_deletes");
-    this.schema = calculateSchema();
+    this(table, ".position_deletes");

Review Comment:
   Is this supposed to be `table.name() + ".position_deletes"`?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {
-    super(table, table.name() + ".position_deletes");
-    this.schema = calculateSchema();
+    this(table, ".position_deletes");
   }
 
   PositionDeletesTable(Table table, String name) {
     super(table, name);
     this.schema = calculateSchema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs =

Review Comment:
   What about adding `transformSpecs` to `BaseMetadataTable` that would take a map of original specs? I think you need the same code below in the scan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1109113493


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();

Review Comment:
   Yea was trying to save a few characters everytime I referenced it, not sure its worth in the end.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);

Review Comment:
   Good point, moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#issuecomment-1442632462

   Thanks, I filed a follow-up issue:  https://github.com/apache/iceberg/issues/6925 to implement pushdown optimization of queries with filters of constant columns (ie, spec_id, delete_file_path).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1099053130


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTable;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Preconditions.checkArgument(
+        deletesTable instanceof MetadataTable,
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Preconditions.checkArgument(
+        ((MetadataTable) deletesTable).type().equals(MetadataTableType.POSITION_DELETES),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Table baseTable = ((MetadataTable) deletesTable).baseTable();

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1099052715


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   Still had the doubt, but made the change.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -75,16 +75,15 @@ public Schema schema() {
     return schema;
   }
 
-  private Schema calculateSchema() {
-    Types.StructType partitionType = Partitioning.partitionType(table());
+  public static Schema schema(Schema schema, Types.StructType partitionType) {

Review Comment:
   Yes, removed public method and fixed the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1101949695


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());

Review Comment:
   Do we have a test that verifies that `input_file_name()` actually works? We would need to check that there are multiple files and their names are populated correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1096506568


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   Yea I need the table because Partitioning.partitionType(table) takes a table.  I could add a Partitioning.partitionType(specs).  That being said, it seems a bit misleading to me that spec() of PositionDeletesTable returns spec of underlying table, because the position delete table itself is partitioned a different way than the underlying table, not sure if it's a valid concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1108060309


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader

Review Comment:
   What about defining a helper method with a reasonable name to abstract this logic away?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();

Review Comment:
   nit: Do we gain much by defining this var? Isn't `expectedSchema()` simply a getter?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);

Review Comment:
   nit: What about moving `idToConstant` after `getInputFile` cause it is first used only 30 lines later?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =
+        fields.stream()
+            .filter(id -> schema.findField(id).type().isPrimitiveType())
+            .collect(Collectors.toSet());
+    nonConstantFields.removeAll(idToConstant.keySet());
+    Expression residualWithoutConstants =
+        ExpressionUtil.extractByIdInclusive(
+            task.residual(),
+            task.spec().schema(),

Review Comment:
   Question: Shouldn't this be `expectedSchema()`?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(

Review Comment:
   nit: I know we do this in other readers but what about this to stay on one line?
   
   ```
   Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with %s", task);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =

Review Comment:
   nit: Should this be called `nonConstantFieldIds` as it holds IDs?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =
+        fields.stream()
+            .filter(id -> schema.findField(id).type().isPrimitiveType())
+            .collect(Collectors.toSet());
+    nonConstantFields.removeAll(idToConstant.keySet());

Review Comment:
   Would it be more readable to have two filter predicates in the stream instead of a temp set?
   
   ```
   Set<Integer> nonConstantFieldIds =
       expectedSchema().idToName().keySet().stream()
           .filter(id -> expectedSchema().findField(id).type().isPrimitiveType())
           .filter(id -> !idToConstant.containsKey(id))
           .collect(Collectors.toSet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1106238037


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {
-    super(table, table.name() + ".position_deletes");
-    this.schema = calculateSchema();
+    this(table, ".position_deletes");
   }
 
   PositionDeletesTable(Table table, String name) {
     super(table, name);
     this.schema = calculateSchema();
+    this.defaultSpecId = table.spec().specId();
+    this.specs =

Review Comment:
   Done, hope its what you meant.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {
-    super(table, table.name() + ".position_deletes");
-    this.schema = calculateSchema();
+    this(table, ".position_deletes");

Review Comment:
   Good catch, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#issuecomment-1435265950

   Thanks, @szehon-ho! It is nice to have this done. I also added @rdblue as a co-author since this PR includes some logic from PR #6599.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094892920


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -75,16 +75,15 @@ public Schema schema() {
     return schema;
   }
 
-  private Schema calculateSchema() {
-    Types.StructType partitionType = Partitioning.partitionType(table());
+  public static Schema schema(Schema schema, Types.StructType partitionType) {

Review Comment:
   Are we opening this up purely for testing? If so, can we simply construct a table there and call `schema()`? I don't mind having package private methods visible for testing but it would be nice to avoid extra public methods. We are trying to be more careful with what is exposed even to developers.



##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java:
##########
@@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter {
 
   private final Schema schema;
   private final Expression expr;
+  private final Set<Integer> constantFieldIds;
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
-    this(schema, unbound, true);
+    this(schema, unbound, true, ImmutableSet.of());

Review Comment:
   The fix here seems correct. I think we also have this issue in CDC. Let me check there.



##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   If I understand correctly, this interface exists only to build a correct partition map with constants. What about making our position deletes table expose the correct partitioning of the base table instead? Would it be fair to say it is partitioned in the same way as the main table? Delete files are also annotated with spec IDs.
   
   If so, it will be a matter of adding the following logic in `PositionDeletesTable`:
   ```
   private final int defaultSpecId;
   private final Map<Integer, PartitionSpec> specs;
   
   ...
   
   PositionDeletesTable(Table table, String name) {
     super(table, name);
     this.schema = schema(table().schema(), Partitioning.partitionType(table()));
     this.defaultSpecId = table.spec().specId();
     this.specs = table.specs();
   }
   
   ...
   
   @Override
   public PartitionSpec spec() {
     return specs.get(defaultSpecId);
   }
   
   @Override
   public Map<Integer, PartitionSpec> specs() {
     return specs;
   }
   ```
   
   After that, we should be able to remove this interface and also simply use `BaseReader$constantsMap` and remove most of the custom logic in `PositionDeleteRowReader`.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTable;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Preconditions.checkArgument(
+        deletesTable instanceof MetadataTable,
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Preconditions.checkArgument(
+        ((MetadataTable) deletesTable).type().equals(MetadataTableType.POSITION_DELETES),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Table baseTable = ((MetadataTable) deletesTable).baseTable();

Review Comment:
   I feel this logic won't be needed if we expose correct specs in the position deletes metadata table, like I mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#issuecomment-1428720815

   Will take another look today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1105178658


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>

Review Comment:
   I wonder whether we should deprecate and drop `EqualityDeleteRowReader`. It is not used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1101938158


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -43,15 +43,21 @@
 public class PositionDeletesTable extends BaseMetadataTable {
 
   private final Schema schema;
+  private final int defaultSpecId;
+  private final Map<Integer, PartitionSpec> specs;
 
   PositionDeletesTable(Table table) {

Review Comment:
   The constructors seem repetitive. Can we just call the other one here?
   
   ```
   this(table, table.name() + ".position_deletes");
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>

Review Comment:
   nit: `PositionDeleteRowReader` -> `PositionDeletesRowReader` (plural)?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);

Review Comment:
   nit: What about an empty line before this as the class definition is split into multiple lines?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(

Review Comment:
   Do we really need this check? Isn't it sufficient to know we got `PositionDeletesScanTask`? I doubt it will be false at any point. Even if someone passes a custom table implementation, the code should work as we just need to follow the contract of the task and table.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+    return newIterable(
+            inputFile,
+            task.file().format(),
+            task.start(),
+            task.length(),
+            task.residual(),
+            expectedSchema(),
+            idToConstant)
+        .iterator();
+  }
+
+  protected Map<Integer, ?> constantsMap(

Review Comment:
   Can't we use `constantsMap()` from `BaseReader` now?



##########
core/src/main/java/org/apache/iceberg/SerializableTable.java:
##########
@@ -116,7 +116,7 @@ private FileIO fileIO(Table table) {
     return table.io();
   }
 
-  private Table lazyTable() {
+  protected Table lazyTable() {

Review Comment:
   Is this still needed?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());

Review Comment:
   Do we have a test that verifies that `input_file_name()` actually works?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+    return newIterable(

Review Comment:
   nit: What about an empty line before `return`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1103197152


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+    return newIterable(
+            inputFile,
+            task.file().format(),
+            task.start(),
+            task.length(),
+            task.residual(),
+            expectedSchema(),
+            idToConstant)
+        .iterator();
+  }
+
+  protected Map<Integer, ?> constantsMap(

Review Comment:
   Good point, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1106568444


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =
+        fields.stream()
+            .filter(id -> schema.findField(id).type().isPrimitiveType())

Review Comment:
   We need to only pass in primitive field ids, because internally ExpressionUtil.extractByIdInclusive uses these to make a dummy Identity partition spec, and those only take primitive fields.
   
   Alternatively we could push this down to method itself, but leaving here so method is same as in: https://github.com/apache/iceberg/pull/6599



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1102025442


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   After thinking more about it, would it be a good idea to expose transformed specs? I mean those that we use during planning by calling `BaseMetadataTable.transformSpec`? Would those be correct specs for the table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1102025442


##########
core/src/main/java/org/apache/iceberg/MetadataTable.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface representing a metadata table. */
+public interface MetadataTable {

Review Comment:
   After thinking more about it, would it be a good idea to expose transformed specs? I mean those that we use during planning by calling `BaseMetadataTable.transformSpec`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1093922634


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTable;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Table deletesTable = table();
+    Preconditions.checkArgument(
+        deletesTable instanceof MetadataTable,
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Preconditions.checkArgument(
+        ((MetadataTable) deletesTable).type().equals(MetadataTableType.POSITION_DELETES),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+    Table baseTable = ((MetadataTable) deletesTable).baseTable();

Review Comment:
   Because RowReader's table is PositionDeletesTable, we need the base table's schema to calculate the partition type here. 
   
   Hence, added a "metadataTable" interface implemented by both Serializable and non-Serializable Position Delete table to get this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094108930


##########
orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java:
##########
@@ -84,15 +91,18 @@ public CloseableIterator<T> iterator() {
     addCloseable(orcFileReader);
 
     TypeDescription fileSchema = orcFileReader.getSchema();
+    Schema schemaWithoutConstantFields =

Review Comment:
   Previously, schema already pruned without constant columns is passed into ORCIterable for its filter logic.
   
   However, this fails attempts to bind constant column filters.  This makes it so that we prune it only when we need it, but we keep it in attempting to bind the constant column filters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1094106234


##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java:
##########
@@ -50,15 +51,22 @@ public class ParquetMetricsRowGroupFilter {
 
   private final Schema schema;
   private final Expression expr;
+  private final Set<Integer> constantFieldIds;
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
-    this(schema, unbound, true);
+    this(schema, unbound, true, ImmutableSet.of());

Review Comment:
   Without this, any column whose stats are null go to ROWS_CANNOT_MATCH.  Thus any deletes table constant column filter at all will lead to 0 results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1103196828


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+    return newIterable(

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeleteRowReader.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeleteRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeleteRowReader.class);
+
+  PositionDeleteRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeleteRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Preconditions.checkArgument(
+        table() instanceof PositionDeletesTable
+            || (table() instanceof SerializableTable.SerializableMetadataTable
+                && ((SerializableTable.SerializableMetadataTable) table())
+                    .type()
+                    .equals(MetadataTableType.POSITION_DELETES)),
+        "PositionDeleteRowReader is only supported for PositionDeletesTable");
+
+    Types.StructType partitionType = Partitioning.partitionType(table());
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema(), partitionType);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());

Review Comment:
   Added in 'testSelect'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #6716: Spark 3.3: Implement Position Deletes Table

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #6716:
URL: https://github.com/apache/iceberg/pull/6716#discussion_r1109113625


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =
+        fields.stream()
+            .filter(id -> schema.findField(id).type().isPrimitiveType())
+            .collect(Collectors.toSet());
+    nonConstantFields.removeAll(idToConstant.keySet());

Review Comment:
   Good point, done.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PositionDeletesRowReader extends BaseRowReader<PositionDeletesScanTask>
+    implements PartitionReader<InternalRow> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PositionDeletesRowReader.class);
+
+  PositionDeletesRowReader(SparkInputPartition partition) {
+    this(
+        partition.table(),
+        partition.taskGroup(),
+        partition.expectedSchema(),
+        partition.isCaseSensitive());
+  }
+
+  PositionDeletesRowReader(
+      Table table,
+      ScanTaskGroup<PositionDeletesScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+
+    super(table, taskGroup, expectedSchema, caseSensitive);
+
+    int numSplits = taskGroup.tasks().size();
+    LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name());
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
+    return Stream.of(task.file());
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
+    Schema schema = expectedSchema();
+    Map<Integer, ?> idToConstant = constantsMap(task, schema);
+    String filePath = task.file().path().toString();
+    LOG.debug("Opening position delete file {}", filePath);
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile inputFile = getInputFile(task.file().path().toString());
+    Preconditions.checkNotNull(
+        inputFile, "Could not find InputFile associated with PositionDeleteScanTask");
+
+    // select out constant fields when pushing down filter to row reader
+    Set<Integer> fields = schema.idToName().keySet();
+    Set<Integer> nonConstantFields =
+        fields.stream()
+            .filter(id -> schema.findField(id).type().isPrimitiveType())
+            .collect(Collectors.toSet());
+    nonConstantFields.removeAll(idToConstant.keySet());
+    Expression residualWithoutConstants =
+        ExpressionUtil.extractByIdInclusive(
+            task.residual(),
+            task.spec().schema(),

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org