You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/18 21:53:42 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request, #4812: Spark: Support reading position deletes

szehon-ho opened a new pull request, #4812:
URL: https://github.com/apache/iceberg/pull/4812

   This exposes position deletes as a metadata table "position_deletes", with schema:  file, pos, row.
   
   This will be useful when trying to implement "RewritePositionDeleteFiles", as we will read positional deletes from Spark and then write it (in a follow-up).  But this will also be useful for users to get more insights into their positional delete files as a table via SQL.
   
   Notes:
   
   1. Design choice:  Why via metadata table?  Initially I tried to implement it as a Spark Read config, but  SparkCatalog.loadTable didnt support read configuration options to load SparkTable with an alternate schema.  So hence chose metadata table path.
   2. Implementation: Most of the changes here are adding the concept of DeleteFileScanTask.  The FileScanTask is today bound to DataFiles (FileScanTask.file() returns DataFile) and we can't really change that as it's used in hundreds of places, so added a contentFile() method that returns the DataFile for DataFileScanTask or DeleteFile in case of DeleteFileScanTask.  To support position delete file scan, change code calls that need to scan delete files from FileScanTask.file() => FileScanTask.contentFile(), as all logics should work equally with both DeleteFile and DataFile.
   3. Todo: Currently this scans the whole table's delete files.  Implementing filter push-down will be an important follow up.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1164982089

   Added partition column and got filter push down working, with all the partition spec evolution scenarios.  Added a lot of tests in dedicated file (TestPositionDeletesTable).
   
   Will look to see how to incorporate #5077 is in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923950980


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hi guys, I hit the first issue , as  the code for PositionDeletesTable/ DataTask is in core module, there is no way currently to access Parquet, ORC, and the file readers to implement DataTask::rows().  I mean Spark could pass in a function that returns a CloseableIterable<Row> but then it seems a bit silly to use the DataTask (except that DataTask can add the static columns like partition and partition_spec_id)
   
   Another thing that needs to be passed in is encryption (like in DeleteFilter::inputFile(), it seems to be different in Spark/Flink.  Let me know if you have any thoughts?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923950980


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hi guys, I hit the first issue , as StaticDataTask is in core module, there is no way currently for it to access Parquet, ORC, and the file readers.  So maybe engines, like Spark, can pass it in?  This would seem to require an API change in any case on DataTask.
   
   So something like, adding a new method DataTask::rows(Function<DeleteFile, Schema, Expression> filter readerFunction)?  
   
   This can also take care of encryption, which seems to also be a callback (for example DeleteFilter::inputFile(), which is implemented by Spark/Flink).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919547379


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,29 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {

Review Comment:
   In the end, decided to make the method work for both kind of delete files, as the DeleteFile object should not be different:  https://github.com/apache/iceberg/pull/4812#discussion_r918441075



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919473652


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   One of the tricky things is to pick a column field id, because here it may conflict with the user field id.  (the row of position deletes is a exact struct of the user's schema).  So I will still put these constants on the class MetadataColumns, but as reserved columns instead of metadata columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876421452


##########
core/src/main/java/org/apache/iceberg/FixedSizeSplitScanTaskIterator.java:
##########
@@ -0,0 +1,50 @@
+/*

Review Comment:
   Note: moved from BaseFileScanTask, no changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  So I would say, users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  
   
   Adding these columns directly to PositionDeletes metadata table and having filtering pushdown will be trickier.  This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to probably subclass it to contain this information.  And we would need to add metadata columns just for this metadata table, if so, to expose it.   (currently the only way to add columns to tables that use Spark RowReader is by metadata columns) 
   
   It to me a bit of a complicated optimization and maybe not worth the extra code complexity, Not sure if this is a very common query of filtering out certain sequence numbers, as in general delete file entries will always be against data files of lower sequence numbers and thus always apply.  In any case , the user can accomplish it via join a little more expensively.
   
   On the other hand, I could perhaps look at adding partition filtering push down.  There's already metadata column _partition and I think it will be a more common query to optimize.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  So I would say, users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  
   
   Though filtering pushdown will be trickier for this.  This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to first subclass it again to contain this information as in the first version of the pr.  And we would need to add metadata columns just for this metadata table, if so, to expose it.   (currently the only way to add columns to tables that use Spark RowReader is by metadata columns) 
   
   It seems a bit complicated optimization and maybe not worth the extra complexity, Not sure if there's a very common query of filtering out certain sequence numbers, as in general delete file entries will always be against data files of lower sequence numbers and thus always apply.
   
   On the other hand, I could perhaps look at adding partition filtering push down.  There's already metadata column _partition and I think it will be a more common query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1144253900

   I feel it is reasonable to have a metadata table like this. I am not sure about the change in `FileScanTask`, though. I also have a few questions related to `FileScanTask` in #4870. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876589072


##########
core/src/main/java/org/apache/iceberg/BaseFileScanTask.java:
##########
@@ -100,167 +101,4 @@ public String toString() {
         .add("residual", residual())
         .toString();
   }
-
-  /**
-   * This iterator returns {@link FileScanTask} using guidance provided by split offsets.
-   */
-  @VisibleForTesting
-  static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
-    private final List<Long> offsets;
-    private final List<Long> splitSizes;
-    private final FileScanTask parentScanTask;
-    private int sizeIdx = 0;
-
-    OffsetsAwareTargetSplitSizeScanTaskIterator(List<Long> offsetList, FileScanTask parentScanTask) {
-      this.offsets = ImmutableList.copyOf(offsetList);
-      this.parentScanTask = parentScanTask;
-      this.splitSizes = Lists.newArrayListWithCapacity(offsets.size());
-      if (offsets.size() > 0) {

Review Comment:
   Is this a situation that can be encountered? And if it can, do we need to ensure that `parentScanTask` is in fact empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r922355758


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Thanks for taking a look.  It's getting a bit more messy than anticipated.
   
   Was spending a lot of time yesterday looking at the marker FileScanTask approach and it does get messy.  Especially  trying to add another constant column (spec_id) to this table, not only is it another use of metadata column  to populate it, we have to figure out how to make a residual expression without the constant column as otherwise the default file read code tries to filter too aggressively if there is a spec_id filter (as the stats do not exist on the file and the pruning code skips it) .  In short , adding any more constant columns to this table, gets a bit messy.
   
   Makes sense to explore using some kind of StaticDataTask.  I think the problem for that, currently it would not do split() and implement any file residual filtering on position-delete row values if there are any filters there, and of course vectorization, and it'll be re-implementing these if we want it.  But maybe I am over-optimizing this and in the first cut we can live without those and add them later.  Can look at this approach, and see if I hit any major blockers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921596928


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   We don't necessarily have to use `DataTask` if we decide to go with static columns. We can add a new task type that will leverage another way to set these constant values instead of hacking `PartitionUtil$constantsMap`.
   
   The new task will have to extend `FileScanTask`, though, as it will be part of `TableScan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910268613


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   So I spent a bit of time on this to implement a PositionDeletesScanTask, but it still does not work yet.  TableScan still returns FileScanTask, so unless we change the interface I am not sure how this will get passed to Spark (Table.newScan is used everywhere to get TableScans).  
   
   I think this is possible like in my first commit https://github.com/apache/iceberg/pull/4812/files/051014682a2e876d2d5a7cb0b545bb5c1f8fa171 by making a new method on FileScanTask (alternatively, maybe we can add a new method on Table to return a parameterized Scan), but I guess it's not as clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919546780


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles
+          .readDeleteManifest(m, tableOps().io(), transformedSpecs)
+          .filterRows(rowFilter)
+          .liveEntries();
+
+      // Filter delete file type
+      CloseableIterable<ManifestEntry<DeleteFile>> positionDeleteEntries = CloseableIterable.filter(deleteFileEntries,
+          entry -> entry.file().content().equals(FileContent.POSITION_DELETES));
+
+      return CloseableIterable.transform(positionDeleteEntries, entry -> {
+        PartitionSpec spec = transformedSpecs.get(entry.file().specId());
+        ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, context().caseSensitive());

Review Comment:
   Good point, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r903028109


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   I think we may need something here to deal with splittable vs un-splittable files? Currently all of our delete files are parquet (I believe) which means we have to take into effect our offsets when planning and combining splits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r920367207


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Also adding some background:  I had tried earlier to use the existing _partition metadata column for this table (which is actually populated), but the problem is that pushdown does not work for metadata columns.    It seems the pruneSchema() function is called after filter() function on SparkScanBuilder, so that the binding in filter will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1208830953

   Some test was cancelled, re-triggering


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r887419779


##########
core/src/main/java/org/apache/iceberg/BaseFileScanTask.java:
##########
@@ -100,167 +101,4 @@ public String toString() {
         .add("residual", residual())
         .toString();
   }
-
-  /**
-   * This iterator returns {@link FileScanTask} using guidance provided by split offsets.
-   */
-  @VisibleForTesting
-  static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
-    private final List<Long> offsets;
-    private final List<Long> splitSizes;
-    private final FileScanTask parentScanTask;
-    private int sizeIdx = 0;
-
-    OffsetsAwareTargetSplitSizeScanTaskIterator(List<Long> offsetList, FileScanTask parentScanTask) {
-      this.offsets = ImmutableList.copyOf(offsetList);
-      this.parentScanTask = parentScanTask;
-      this.splitSizes = Lists.newArrayListWithCapacity(offsets.size());
-      if (offsets.size() > 0) {

Review Comment:
   Works for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r887394257


##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -26,15 +26,21 @@
  * A scan task over a range of a single file.
  */
 public interface FileScanTask extends ScanTask {
+
+  /**
+   * @return the file to scan
+   */
+  ContentFile<?> contentFile();

Review Comment:
   Yea I think that works although it felt not so clean,  DeleteDataFile?   I though originally it would have been better to have ContentFileScanTask if we started fresh, as all the DataFile fields are from there anyway.  Agree that this way will definitely be less changes in code, so I'm ok if everyone agrees
   
   Will try to follow on #4870 as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1131938662

   FYI @aokolnychyi @RussellSpitzer @rdblue, if you also have time to take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919473652


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   One of the tricky things is to pick a column field id, because here it may conflict with the user field id.  (the "row" column of position deletes is a struct that reflects the user schema).  So I will still put these constants on the class MetadataColumns, but as reserved columns instead of metadata columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919547475


##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {
     PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
-    spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
+    partitionType.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.fieldId(), pf.name(), "identity"));

Review Comment:
   Changed it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919443384


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {
+    Preconditions.checkArgument(deleteFile.content().equals(FileContent.POSITION_DELETES),
+        "Expected positional delete file, found %s", deleteFile.content());
+    Metrics metrics = new Metrics(
+        deleteFile.recordCount(),
+        deleteFile.columnSizes(),
+        deleteFile.valueCounts(),
+        deleteFile.nullValueCounts(),
+        deleteFile.nanValueCounts(),
+        deleteFile.lowerBounds(),
+        deleteFile.upperBounds()
+    );
+
+    return DataFiles.builder(spec)
+        .withEncryptionKeyMetadata(deleteFile.keyMetadata())
+        .withFormat(deleteFile.format())
+        .withFileSizeInBytes(deleteFile.fileSizeInBytes())
+        .withPath(deleteFile.path().toString())
+        .withPartition(deleteFile.partition())
+        .withRecordCount(deleteFile.recordCount())
+        .withSortOrder(SortOrder.unsorted())

Review Comment:
   In my test, sort order id doesnt seem to be populated on delete files (returns null), but added a conditional set in case it is populated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923950980


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hi guys, I hit the first issue , as all the PositionDeletesTable, MetadataTables, StaticDataTask is in core module, there is no way currently to access Parquet, ORC, and the file readers.  I mean Spark could pass in a function that returns a CloseableIterable<Row> but then it seems a bit silly to use the DataTask (except that DataTask can add the static columns like partition and partition_spec_id)
   
   Another thing that needs to be passed in is encryption (like in DeleteFilter::inputFile(), it seems to be different in Spark/Flink.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho closed pull request #4812: Spark 3.2: Support reading position deletes

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho closed pull request #4812: Spark 3.2: Support reading position deletes
URL: https://github.com/apache/iceberg/pull/4812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940546212


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Update, in latest implementation, removed partition spec from table, as original comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876602325


##########
core/src/main/java/org/apache/iceberg/BaseFileScanTask.java:
##########
@@ -100,167 +101,4 @@ public String toString() {
         .add("residual", residual())
         .toString();
   }
-
-  /**
-   * This iterator returns {@link FileScanTask} using guidance provided by split offsets.
-   */
-  @VisibleForTesting
-  static final class OffsetsAwareTargetSplitSizeScanTaskIterator implements Iterator<FileScanTask> {
-    private final List<Long> offsets;
-    private final List<Long> splitSizes;
-    private final FileScanTask parentScanTask;
-    private int sizeIdx = 0;
-
-    OffsetsAwareTargetSplitSizeScanTaskIterator(List<Long> offsetList, FileScanTask parentScanTask) {
-      this.offsets = ImmutableList.copyOf(offsetList);
-      this.parentScanTask = parentScanTask;
-      this.splitSizes = Lists.newArrayListWithCapacity(offsets.size());
-      if (offsets.size() > 0) {

Review Comment:
   I think sometimes it happens?  (This part is the original code, I just moved it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893713128


##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -26,15 +26,21 @@
  * A scan task over a range of a single file.
  */
 public interface FileScanTask extends ScanTask {
+
+  /**
+   * @return the file to scan
+   */
+  ContentFile<?> contentFile();

Review Comment:
   @aokolnychyi reverted the refactor to move this forward.  As over there it seems there is no need to represent EqualityDelete as DataFile, it seems this becomes the only weird case (outside of course the metadata tables).  If we want to do it later, its still open.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921596928


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   We don't necessarily have to use `DataTask` if we decide to go with static columns. We can add a new task type that will leverage another way to set these constant values instead of hacking `PartitionUtil$constantsMap`. The new task will have to extend `FileScanTask`, though, as it will be part of `TableScan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r920367207


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Also adding some background:  I had tried earlier to use the existing metadata column for this table (which is populated), but then pushdown does not work for metadata columns.    It seems the pruneSchema() function is called after filter() function on SparkScanBuilder, so that the binding in filter will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r902997052


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {

Review Comment:
   nit: avoid one letter function args, could just be a "file"  or deleteFile unless we are out of o line length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940819083


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Update, this should be resolved, do not re-use metadata column ids anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940544376


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hey guys I really tried hard on this, but couldn't find a way.  We looked with @RussellSpitzer  and looks like Trino actually wont break if we had a new table (they support each one manually, and actually i think a good portion  of their metadata tables don't even use our MetadataTable classes or MetadataTableScans classes altogether).   Discussed a bit with @aokolnychyi as well and coudn't find a good way.  Made a latest pr with a least bad way, piggy backing off of some of the latest RowReader refactoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940562715


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -89,8 +92,15 @@ private MetadataColumns() {}
           ROW_POSITION.fieldId(),
           IS_DELETED.fieldId(),
           SPEC_ID.fieldId(),
-          PARTITION_COLUMN_ID);
+          PARTITION_COLUMN_ID,

Review Comment:
   This is to avoid an issue when trying to read these constant columns via Avro reader, which supports but only if they are defined in this set.
   
   ```
   Caused by: java.lang.IllegalArgumentException: Missing required field: partition
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:104)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:42)
   	at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:50)
   	at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:126)
   	at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:68)
   	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:139)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:131)
   	at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:75)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
   	at org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:110)
   	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:61)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:38)
   	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:128)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940544376


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hey guys I really tried hard on this, but couldn't find a way.  We looked with @RussellSpitzer  and looks like Trino actually wont break if we had a new table (they support each one manually, and actually i think a good portion  of their metadata tables don't even use our MetadataTable class altogether).   Discussed a bit with @aokolnychyi as well and coudn't find a good way.  Made a latest pr with a least bad way, piggy backing off of some of the latest RowReader refactoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923961118


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   I guess the only thing I can think to do is to provide a shared implementation of positionDeleteReader in data module which has access to the Parquet/ORC modules, but each engine still needs to check the type of table (or the marker FileScanTask) and then manually plug this in to the DataTask, for the positionDeletes table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919500148


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Actually looking into this, if I don't re-use the metadata column I have to change a lot of codes, like:
   
   - BaseDataReader.constantsMap will need new method to populate partition based on new partition id, which I'll have to pass in based on what field is selected.  (Currently since both id's are the same it works)
   - All the code in file-type Readers to remove the metadata columns from selected columns and do error checks.  Currently this uses MetadataColumns.metadataFieldIds(), which will need to be now something like MetadataColumns.metadataAndConstantFieldIds()  to take into account new column.  Even though this is not a metadata column per se, it still is a constant column that should not be selected from the content file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919505235


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        Types.NestedField.optional(
+            MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", table.schema().asStruct(),
+            MetadataColumns.DELETE_FILE_ROW_DOC));
+
+    if (partitionType.fields().size() > 0) {

Review Comment:
   This was emulated from other metadata table like BaseFilesTable, but good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919526483


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles

Review Comment:
   So I played with selecting some manifest fields, but actually we need all of them (even column stats).  Because we need to evaluate the filters on the actual delete file, we need to propagate this down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919279775


##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {

Review Comment:
   Whoops you are right, this is from some older fix attempt I was trying and never cleaned it up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919547715


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {

Review Comment:
   Yea initially when writing I thought it would be big, but in the end it is ok, so made it a nested class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921638925


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   My concern with the marker `FileScanTask` is that if an engine is implementing metadata tables like normal reads, then we've introduced a correctness problem because it doesn't know to read the new task differently. I think the cleanest way is probably to use `DataTask`.
   
   I think the concern about using `DataTask` is valid, since it exposes a row-based interface and isn't intended for large uses that benefit from vectorization. It was originally intended for small tables, like `snapshots`.
   
   However, I've been thinking for a while that a significant improvement is to adapt Arrow record batches into rows, so we can take advantage of vectorized reads in all cases, not just when the engine supports a vectorized format. That is probably way faster, so we could explore doing that here and using a joined row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1146889848

   Regarding the metadata table, why is it only limited to position deletes? do we anticipate a separate equality deletes metadata table too? This metadata table is at row level, like a source. For row-level deletes metadata, I am wondering if `delete_files` summary is good enough, which is similar to the `files` metadata table.
   
   I assume this position_deletes metadata table is to not related to the CDC read like Ryan suggested in the CDC. Otherwise, the metadata table should cover inserts, position deletes, and equality deletes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876586822


##########
.palantir/revapi.yml:
##########
@@ -27,6 +27,12 @@ acceptedBreaks:
     - code: "java.method.addedToInterface"
       new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
       justification: "Accept all changes prior to introducing API compatibility checks"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.ContentFile<?> org.apache.iceberg.FileScanTask::contentFile()"
+      justification: "New method added to FileScanTask.  Should be not many implementations\
+        \ outside Iceberg, if so it's easy to fill in new method (especially if they\
+        \ inherit BaseFileScanTask).  This is easiest way to add DeleteFileScanTask,\
+        \ otherwise even bigger interface change is necessary"

Review Comment:
   Side note: I'm looking for a way to suppress `addedToInterface` in addition to a few other non-breaking changes.
   
   The way we release Iceberg, it's not really something that we need to worry about much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1144319509

   +1 for using a metadata table for this. It can then be used in numerous places and for various reasons.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919500148


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Actually looking into this, if I don't re-use the metadata column I have to change a lot of codes, like:
   
   - BaseDataReader.constantsMap will need new method to populate partition based on new partition id, which I'll have to pass in based on what field is selected.  (Currently since both id's are the same it works)
   - All the error checks for whether selected columns are in metadata columns need to take this into account.  Currently this uses MetadataColumns.metadataFieldIds(), which will need to be now something like MetadataColumns.metadataAndConstantFieldIds()  to take into account new column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921571789


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
    It seems we have a static column in the metadata table that we plan to populate via the mechanism for metadata columns. It looks a little bit suspicious to me.
   
   I feel we should pick one of these options:
   - Have only `path`, `pos`, `row` columns in the table and use `_partition` and `_spec_id` metadata columns. That will mean we have to support filter pushdown on metadata columns. It is easy to handle this on the Spark side but we will also have to adapt ALL of our binding code to allow binding predicates with metadata columns. The last part will be a big change.
   - Make `partition` and `spec_id` columns static and use `DataTask`.
   
   I kind of like using `FileScanTask` for this effort to support vectorized reads so the first option seems preferable.
   
   Thoughts, @szehon-ho @RussellSpitzer @rdblue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1182800243

   Rebase and support spark 3.3, resolved most comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923961118


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   I guess the only thing I can think to do is to provide a shared implementation of positionDeleteReader in data module, but each engine still needs to check the type of table (or FileScanTask) and then manually plug this in to the DataTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1147766405

   @stevenzwu yea I think equality deletes will be different table because the schema is not the same.
   
   @aokolnychyi yea the idea wrt RewritePositionDeleteFiles was to use the building blocks in conjunction with file-scan-task-set-id, for the compaction case.  This pr should have most of the pieces though, and hopefully we just need to add some wiring.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r905558549


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   I think this should be fine, currently BaseFileScanTask checks if the file type is splittalbe before using this value.  I also added test in latest pr against all three file formats, against one test method that purposely hits the split limit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r905560701


##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {

Review Comment:
   There is a tricky bug here in partition filter push down that took me awhile to find.
   
   In most metadata tables, the transformed partition spec used only in ManifestGroup to filter out matching manifests.  In that case, it seems the generated fieldId is not important.  However, the position delete metadata table needs to go one level further and pushes filter to ManifestReader to filter matching manifest-entries.   A wrong fieldId causes the DeleteFile to be instantiated with wrong partition information (as the field id is used to project onto the partition struct to lookup the value), leading to various bugs in filtering out the wrong files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r896127989


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   Update: metadata column works but unfortunately does not get pushed down for filtering by Iceberg in general:  SparkScanBuilder.pushFilters().  Seems like it will be useful but orthogonal change.  
   
   Investigating making partition column as non-metadata column, though its a shame as there already exists the _partition metadata column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r902998351


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   We'll probably want a new ScanTask type here once #5077 is in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923718483


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Yeah, +1 for getting a working implementation with `DataTask` and optimizing as we go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910268613


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   So I spent a bit of time on this to implement a PositionDeletesScanTask, but it still does not work yet.  TableScan still returns FileScanTask, so unless we change the interface I am not sure how this can work in Spark (Table.newScan is used everywhere to get TableScans).  
   
   I think this is possible like in my first commit https://github.com/apache/iceberg/pull/4812/files/051014682a2e876d2d5a7cb0b545bb5c1f8fa171 by making a new method on FileScanTask (alternatively, maybe we can add a new method on Table to return a parameterized Scan), but I guess it's not as clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940562715


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -89,8 +92,15 @@ private MetadataColumns() {}
           ROW_POSITION.fieldId(),
           IS_DELETED.fieldId(),
           SPEC_ID.fieldId(),
-          PARTITION_COLUMN_ID);
+          PARTITION_COLUMN_ID,

Review Comment:
   This is to avoid an issue when trying to read these new constant columns via Avro reader, which is supported but only if they are defined in this set.  Note, we chose explicitly to not re-use the existing metadata column ids for these constants, as it might cause confusion (metadata columns being a concept for data tables). 
   
   ```
   Caused by: java.lang.IllegalArgumentException: Missing required field: partition
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:104)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:42)
   	at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:50)
   	at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:126)
   	at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:68)
   	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:139)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:131)
   	at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:75)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
   	at org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:110)
   	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:61)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:38)
   	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:128)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho closed pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho closed pull request #4812: Spark 3.2: Support reading position deletes
URL: https://github.com/apache/iceberg/pull/4812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919541969


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   The default metadata split size is 32MB vs data file split size is 128 MB.  I thought initially the metadata split size is more suitable to read metadata than content files (which could be big?) but I can change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919500148


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Actually looking into this, if I don't re-use the metadata column id I have to change a lot of codes, like:
   
   - BaseDataReader.constantsMap will need new method to populate partition based on new partition id, which I'll have to pass in based on what field is selected.  (Currently since both id's are the same it works)
   - All the code in file-type Readers to remove the metadata columns from selected columns and do error checks.  Currently this uses MetadataColumns.metadataFieldIds(), which will need to be now something like MetadataColumns.metadataAndConstantFieldIds()  to take into account new column.  Even though this is not a metadata column per se, it still is a constant column that should not be selected from the content file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r903019191


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -167,6 +181,70 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String...
     return set;
   }
 
+  private StructLikeSet actualPositionDeleteRowSet(String tableName, Table table) {
+    return actualPositionDeleteRowSet(tableName, table, null, null);
+  }
+
+  private StructLikeSet actualPositionDeleteRowSet(String tableName, Table table, String filter) {
+    return actualPositionDeleteRowSet(tableName, table, filter, null);
+  }
+
+  private StructLikeSet actualPositionDeleteRowSet(String tableName,
+                                                   Table table,
+                                                   Types.StructType selectSchema) {
+    return actualPositionDeleteRowSet(tableName, table, null, selectSchema);
+  }
+
+  private StructLikeSet actualPositionDeleteRowSet(String tableName,
+                                                   Table table,
+                                                   String filter,
+                                                   Types.StructType selectSchema) {
+    Dataset<Row> df = spark.read()
+        .format("iceberg")
+        .load("default." + tableName + ".position_deletes");
+    if (selectSchema != null) {
+      df = df.select(
+          selectSchema.fields().stream().map(

Review Comment:
   looks like odd formatting here? maybe put the whole map on the following line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876421517


##########
core/src/main/java/org/apache/iceberg/OffsetsAwareTargetSplitSizeScanTaskIterator.java:
##########
@@ -0,0 +1,66 @@
+/*

Review Comment:
   Note: moved from BaseFileScanTask, no changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876601769


##########
.palantir/revapi.yml:
##########
@@ -27,6 +27,12 @@ acceptedBreaks:
     - code: "java.method.addedToInterface"
       new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
       justification: "Accept all changes prior to introducing API compatibility checks"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.ContentFile<?> org.apache.iceberg.FileScanTask::contentFile()"
+      justification: "New method added to FileScanTask.  Should be not many implementations\
+        \ outside Iceberg, if so it's easy to fill in new method (especially if they\
+        \ inherit BaseFileScanTask).  This is easiest way to add DeleteFileScanTask,\
+        \ otherwise even bigger interface change is necessary"

Review Comment:
   Yea I didn't know the new policy, thanks, good to know.   (was thinking to try adding a default too).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876421452


##########
core/src/main/java/org/apache/iceberg/FixedSizeSplitScanTaskIterator.java:
##########
@@ -0,0 +1,50 @@
+/*

Review Comment:
   Note: moved from BaseFileScanTask, changes : FileScanTask.file() => FileScanTask.contentFile()



##########
core/src/main/java/org/apache/iceberg/SplitScanTask.java:
##########
@@ -0,0 +1,122 @@
+/*

Review Comment:
   Note: moved from BaseFileScanTask, changes : FileScanTask.file() => FileScanTask.contentFile()



##########
core/src/main/java/org/apache/iceberg/OffsetsAwareTargetSplitSizeScanTaskIterator.java:
##########
@@ -0,0 +1,66 @@
+/*

Review Comment:
   Note: moved from BaseFileScanTask, changes : FileScanTask.file() => FileScanTask.contentFile()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r887384899


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   It would be nice to include a column that would indicate either the snapshot ID when it was added or the sequence number of the row. We can then have filter pushdown on those columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1146265649

   While I think a metadata table like this would be helpful, I am not sure it is something we can leverage during compaction. We will need to have access to a list of `DeleteFile` we compacted at the commit time so that we can replace them with the newly added files. That's why the data compaction action uses the staging approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1144231896

   Let me take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  Users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  
   
   Though filtering pushdown will be trickier for this.  This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to first subclass it again to contain this information as in the first version of the pr.  And potentially add a metadata column if so to expose it (see below).
   
   I could perhaps look at adding a partition column for partition filtering.  This would again probably be a MetadataColumns as it seems the only supported way to pass extra columns to RowDataReader.  FileScanTask's DataFile already contains this so it may work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  So I would say, users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  
   
   Though filtering pushdown will be trickier for this.  This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to first subclass it again to contain this information as in the first version of the pr.  And we would need to add metadata columns just for this metadata table, if so, to expose it.   (currently the only way to add columns to tables that use Spark RowReader is by metadata columns) 
   
   It seems a bit complicated optimization and maybe not worth the extra complexity, Not sure if there's a very common query of filtering out certain sequence numbers, as in general delete file entries will always be against data files of lower sequence numbers and thus always apply.
   
   On the other hand, I could perhaps look at adding partition filtering push down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r905560701


##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {

Review Comment:
   There is a tricky bug here in partition filter push down that took me awhile to find.
   
   In most metadata tables, the transformed partition spec used only in ManifestGroup to filter out matching manifests.  In that case, it seems the generated fieldId is not important.  However, the position delete metadata table needs to filter out matching manifest-entries (as it actually reads delete files).   A wrong fieldId causes the DeleteFile to be instantiated with wrong partition information (as the field id is used to project onto the partition struct to lookup the value), leading to various bugs in filtering out the wrong files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921596928


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   We don't necessarily have to use `DataTask` if we decide to go with static columns. We can add a new task type that will leverage another way to set these constant values instead of hacking `PartitionUtil` and metadata columns. We should be able to reuse most code in our readers and support vectorized reads as well.
   
   The new task will have to extend `FileScanTask`, though, as it will be part of `TableScan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r920308505


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r924780140


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   cc @rdblue @aokolnychyi @rdblue appreciate any thoughts, I think its a good idea to try to make this table usable across engines using existing DataTask,  without extra if-else handling, but not sure we can do it as of today due to the dependency.  Will try to explore the above , but it  still will not be as clean



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r918441075


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {

Review Comment:
   Does this have to be specific to position deletes? Will constructing `DataFile` from an equality delete file be any different?



##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {
+    Preconditions.checkArgument(deleteFile.content().equals(FileContent.POSITION_DELETES),
+        "Expected positional delete file, found %s", deleteFile.content());
+    Metrics metrics = new Metrics(
+        deleteFile.recordCount(),
+        deleteFile.columnSizes(),
+        deleteFile.valueCounts(),
+        deleteFile.nullValueCounts(),
+        deleteFile.nanValueCounts(),
+        deleteFile.lowerBounds(),
+        deleteFile.upperBounds()
+    );
+
+    return DataFiles.builder(spec)
+        .withEncryptionKeyMetadata(deleteFile.keyMetadata())
+        .withFormat(deleteFile.format())
+        .withFileSizeInBytes(deleteFile.fileSizeInBytes())
+        .withPath(deleteFile.path().toString())
+        .withPartition(deleteFile.partition())
+        .withRecordCount(deleteFile.recordCount())
+        .withSortOrder(SortOrder.unsorted())

Review Comment:
   Shall we propagate the sort order? We can add `withSortOrderId` to the builder.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   I am not sure this is correct. All other metadata tables are unpartitioned and there is logic in the scan builder that assumes all metadata tables are unpartitioned. Do we need to make this one partitioned for a reason?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;

Review Comment:
   nit: use shouldIgnoreResiduals() directly?
   
   ```
   Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : rowFilter;  
   ```



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles

Review Comment:
   Respect case sensitivity configured for this scan in `isCaseSensitive()`?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {

Review Comment:
   nit: can be package-private?



##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {

Review Comment:
   Is there a reason to add an overloaded method that accepts `StructType`?
   Seems like it is not being used right now.



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {

Review Comment:
   Are we sure this logic belongs here? Up to you but `PositionDeletesTable` seemed more appropriate.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles

Review Comment:
   What about projection?



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {

Review Comment:
   Why not make it a nested class like in other metadata tables? Because it is too big?
   Just out of curiosity.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {

Review Comment:
   Same here.



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        Types.NestedField.optional(
+            MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", table.schema().asStruct(),
+            MetadataColumns.DELETE_FILE_ROW_DOC));
+
+    if (partitionType.fields().size() > 0) {

Review Comment:
   I feel like using `TypeUtil.selectNot` will make it easier to read:
   
   ```
   Types.StructType partitionType = Partitioning.partitionType(table);
   Schema schema = new Schema(...);
   
   if (partitionType.fields().size() < 1) {
     // avoid returning an empty struct, which is not always supported. instead, drop the partition field
     return TypeUtil.selectNot(schema, Sets.newHashSet(PARTITION_ID));
   } else {
     return schema;
   }
   ```



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles
+          .readDeleteManifest(m, tableOps().io(), transformedSpecs)
+          .filterRows(rowFilter)
+          .liveEntries();
+
+      // Filter delete file type
+      CloseableIterable<ManifestEntry<DeleteFile>> positionDeleteEntries = CloseableIterable.filter(deleteFileEntries,
+          entry -> entry.file().content().equals(FileContent.POSITION_DELETES));
+
+      return CloseableIterable.transform(positionDeleteEntries, entry -> {
+        PartitionSpec spec = transformedSpecs.get(entry.file().specId());
+        ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, context().caseSensitive());

Review Comment:
   nit: `context().caseSensitive()` -> `isCaseSensitive()`



##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Are these real metadata columns or just columns that happen to have the same name? I don't think we should reuse column IDs as metadata columns are using reserved ones.



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    Expression rowFilter = context().rowFilter();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    boolean ignoreResiduals = context().ignoreResiduals();
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
+
+    Map<Integer, PartitionSpec> transformedSpecs = table().specs()
+        .entrySet()
+        .stream()
+        .map(e -> Pair.of(e.getKey(), BaseMetadataTable.transformSpec(tableSchema(), e.getValue())))
+        .collect(Collectors.toMap(Pair::first, Pair::second));
+
+    CloseableIterable<ManifestFile> deleteManifests = CloseableIterable.withNoopClose(
+        snapshot().deleteManifests(tableOps().io()));
+    CloseableIterable<CloseableIterable<FileScanTask>> results = CloseableIterable.transform(deleteManifests, m -> {
+
+      // Filter partitions
+      CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = ManifestFiles
+          .readDeleteManifest(m, tableOps().io(), transformedSpecs)
+          .filterRows(rowFilter)
+          .liveEntries();
+
+      // Filter delete file type
+      CloseableIterable<ManifestEntry<DeleteFile>> positionDeleteEntries = CloseableIterable.filter(deleteFileEntries,
+          entry -> entry.file().content().equals(FileContent.POSITION_DELETES));
+
+      return CloseableIterable.transform(positionDeleteEntries, entry -> {
+        PartitionSpec spec = transformedSpecs.get(entry.file().specId());
+        ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, context().caseSensitive());

Review Comment:
   Seems like we are doing repetitive work here for each task/spec.
   Some sort of `LoadingCache<Integer, ResidualEvaluator> residualCache` would make sense here like in a few other places. 



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),
+          null, /* Deletes */
+          schemaString,
+          specString,
+          residuals);
+    });
+  }
+
+  @Override
+  public long targetSplitSize() {
+    long tableValue = tableOps().current().propertyAsLong(

Review Comment:
   Hm, I'd be surprised if a metadata table does not respect the metadata split size.



##########
core/src/main/java/org/apache/iceberg/BaseMetadataTable.java:
##########
@@ -60,8 +61,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
    * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
    */
   static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
+    return transformSpec(metadataTableSchema, spec.partitionType());
+  }
+
+  static PartitionSpec transformSpec(Schema metadataTableSchema, Types.StructType partitionType) {
     PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
-    spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
+    partitionType.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.fieldId(), pf.name(), "identity"));

Review Comment:
   Was the switch from `spec.fields()` to `spec.partitionType().fields()` deliberate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919441960


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Actually this crashes if it is not there, given we added a partition column now.  
   
   See this code BaseDataReader::constantsMap, where partition value is populated, it will know to get it from the constants (FileScanTask) else it gets it will try to read the partition column from the file which then crashes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919526125


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,31 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r903020569


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -167,6 +181,70 @@ public StructLikeSet rowSet(String name, Types.StructType projection, String...
     return set;
   }
 
+  private StructLikeSet actualPositionDeleteRowSet(String tableName, Table table) {
+    return actualPositionDeleteRowSet(tableName, table, null, null);
+  }
+
+  private StructLikeSet actualPositionDeleteRowSet(String tableName, Table table, String filter) {
+    return actualPositionDeleteRowSet(tableName, table, filter, null);
+  }
+
+  private StructLikeSet actualPositionDeleteRowSet(String tableName,

Review Comment:
   this and the following have formatting issues I think? We generally don't do 1 arg per line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  Users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  Though it is not for filtering currently.
   
   I can take a look at adding a partition or other filter column.  It looks not very clean (need to add them as MetadataColumns as that's probably the only supported way now to get this to work with existing RowDataReader). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r905334979


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   Update 2:  Successfully made a non-metadata column partition column, and partition filter based on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r876601769


##########
.palantir/revapi.yml:
##########
@@ -27,6 +27,12 @@ acceptedBreaks:
     - code: "java.method.addedToInterface"
       new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)"
       justification: "Accept all changes prior to introducing API compatibility checks"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.ContentFile<?> org.apache.iceberg.FileScanTask::contentFile()"
+      justification: "New method added to FileScanTask.  Should be not many implementations\
+        \ outside Iceberg, if so it's easy to fill in new method (especially if they\
+        \ inherit BaseFileScanTask).  This is easiest way to add DeleteFileScanTask,\
+        \ otherwise even bigger interface change is necessary"

Review Comment:
   Yea I didn't know the new policy, thanks, good to know.   (was thinking to try adding a default too, though it might be unnecessary).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940544376


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hey guys I really tried hard on this, but couldn't find a way.  We looked with @RussellSpitzer  and looks like Trino actually wont break if we had a new table (they support each one manually, and actually i think a good portion don't even use our MetadataTable class altogether).   Discussed a bit with @aokolnychyi as well and coudn't find a good way.  Made a latest pr with a least bad way, piggy backing off of some of the latest RowReader refactoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910268613


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   So I spent a bit of time on this, but it still is not compatible yet.  TableScan still returns FileScanTask so unless we change the interface I am not sure how this will get passed to Spark (Table.newScan is used everywhere to get TableScans).  
   
   I think this is possible in https://github.com/apache/iceberg/pull/4812/files/051014682a2e876d2d5a7cb0b545bb5c1f8fa171 by making a new method on FileScanTask (alternatively, maybe we can add a new method on Table to return a generic Scan), but I guess it's not as clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910275364


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   FYI @aokolnychyi 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923950980


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Hi guys, I hit the first issue , as  the code for PositionDeletesTable/ DataTask is in core module, there is no way currently to access Parquet, ORC, and the file readers to implement DataTask::rows().  I mean Spark could pass in a positionDeleteReader that returns a CloseableIterable<Row> but then it seems a bit silly to use the DataTask (except that DataTask can add the static columns like partition and partition_spec_id)
   
   Another thing that needs to be passed in is encryption (like in DeleteFilter::inputFile(), it seems to be different in Spark/Flink.  Let me know if you have any thoughts?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r925086785


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   What happens here if the table spec has evolved? Shouldn't this be the unified partition spec of the table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919441960


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Actually this crashes if it is not there.  See this code BaseDataReader::constantsMap, where partition field is populated, it will know to get it from the constants (FileScanTask) else it gets it from 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919438125


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {

Review Comment:
   Was using it in a test, but changed the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919548275


##########
core/src/main/java/org/apache/iceberg/io/DeleteSchemaUtil.java:
##########
@@ -20,13 +20,38 @@
 package org.apache.iceberg.io;
 
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 
 public class DeleteSchemaUtil {
   private DeleteSchemaUtil() {
   }
 
+  public static Schema metadataTableSchema(Table table) {
+    return metadataTableSchema(table, Partitioning.partitionType(table));
+  }
+
+  public static Schema metadataTableSchema(Table table, Types.StructType partitionType) {
+    Schema result = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,

Review Comment:
   Did the above, feel free to see if it is better or if its messier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r887394257


##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -26,15 +26,21 @@
  * A scan task over a range of a single file.
  */
 public interface FileScanTask extends ScanTask {
+
+  /**
+   * @return the file to scan
+   */
+  ContentFile<?> contentFile();

Review Comment:
   Yea I think that works although it felt not so clean,  DeleteDataFile?   It will definitely be less changes in code, I'm ok though if everyone agrees.  Will try to follow there as well



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r887384420


##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -26,15 +26,21 @@
  * A scan task over a range of a single file.
  */
 public interface FileScanTask extends ScanTask {
+
+  /**
+   * @return the file to scan
+   */
+  ContentFile<?> contentFile();

Review Comment:
   This PR raises a somehow similar question about the future of `FileScanTask` as in #4870. Maybe, we can discuss there a bit.
   
   Instead of adding a new method like `contentFile`, what about wrapping a delete file as `DataFile` and using the task without any changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r902986896


##########
core/src/main/java/org/apache/iceberg/DataFiles.java:
##########
@@ -108,6 +108,29 @@ public static DataFile fromManifest(ManifestFile manifest) {
         .build();
   }
 
+  public static DataFile fromPositionDelete(DeleteFile deleteFile, PartitionSpec spec) {

Review Comment:
   Do we need an assert here to make sure someone doesn't pass through an Equality delete?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910268613


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   So I spent a bit of time on this to implement a PositionDeletesScanTask, but it still does not work yet.  TableScan still returns FileScanTask (which is tied to DataFile), so unless we change the interface I am not sure how this will get passed to Spark (Table.newScan is used everywhere to get TableScans).  
   
   I think this is possible in https://github.com/apache/iceberg/pull/4812/files/051014682a2e876d2d5a7cb0b545bb5c1f8fa171 by making a new method on FileScanTask (alternatively, maybe we can add a new method on Table to return a generic Scan), but I guess it's not as clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r910268613


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTableScan.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.PropertyUtil;
+
+public class PositionDeletesTableScan extends BaseMetadataTableScan {
+
+  public PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES);
+  }
+
+  PositionDeletesTableScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, MetadataTableType.POSITION_DELETES, context);
+  }
+
+  @Override
+  protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    return new PositionDeletesTableScan(ops, table, schema, context);
+  }
+
+  @Override
+  protected CloseableIterable<FileScanTask> doPlanFiles() {
+    // TODO- add partition column and implement filter push down
+    boolean ignoreResiduals = context().ignoreResiduals();
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : context().rowFilter();
+    ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
+    DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(tableOps().io(), snapshot().deleteManifests())
+        .specsById(table().specs())
+        .build();
+
+    CloseableIterable<DeleteFile> deleteFiles = CloseableIterable.withNoopClose(deleteIndex.referencedDeleteFiles());
+    CloseableIterable<DeleteFile> positionDeleteFiles = CloseableIterable.filter(deleteFiles,
+        df -> df.content().equals(FileContent.POSITION_DELETES));
+    return CloseableIterable.transform(positionDeleteFiles, f -> {
+      PartitionSpec spec = table().specs().get(f.specId());
+      String specString = PartitionSpecParser.toJson(spec);
+      return new BaseFileScanTask(DataFiles.fromPositionDelete(f, spec),

Review Comment:
   So I spent a bit of time on this to implement a PositionDeletesScanTask, but it still does not work yet.  TableScan still returns FileScanTask so unless we change the interface I am not sure how this will get passed to Spark (Table.newScan is used everywhere to get TableScans).  
   
   I think this is possible in https://github.com/apache/iceberg/pull/4812/files/051014682a2e876d2d5a7cb0b545bb5c1f8fa171 by making a new method on FileScanTask (alternatively, maybe we can add a new method on Table to return a generic Scan), but I guess it's not as clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r922355758


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   Thanks for taking a look.  It's getting a bit more messy than anticipated.
   
   Was spending a lot of time yesterday looking at the marker FileScanTask approach and it does get messy.  Especially  trying to add another constant column (spec_id) to this table, not only is it another use of metadata column  to populate it, we have to figure out how to make a residual expression without the constant column as otherwise the default file read code tries to filter too aggressively if there is a spec_id filter (as the stats do not exist on the file and the pruning code skips it) .  In short , adding any constant columns to this table, gets a bit messy.
   
   Makes sense to explore using some kind of StaticDataTask.  I think the problem for that, currently it would not do split() and implement any file residual filtering on position-delete row values if there are any filters there, and of course vectorization, and it'll be re-implementing these if we want it.  But maybe I am over-optimizing this and in the first cut we can live without those and add them later.  Can look at this approach, and see if I hit any major blockers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r921603083


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   +1 yea I prefer static (declared) columns in my opinion, I can give a try to see if adding a new kind of marker FileScanTask helps clean it up. 
   
   Partition/spec_id will be important columns for future things we want to do with this table (ie, compact position deletes, remove dangling position deletes), so having them be a metadata column of metadata table is a bit too meta!  Though the population mechanism will be similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r919441960


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Actually this crashes if it is not there, given we added a partition column now.  
   
   See this code BaseDataReader::constantsMap, where partition value is populated, it uses the fact that the table has a partition spec to know to get it from the constants (FileScanTask) else it gets it will try to read the partition column from the file which then crashes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r923961118


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   I guess the only thing I can think to do is to provide a shared implementation of positionDeleteReader in data module, but each engine still needs to check the type of table (or the marker FileScanTask) and then manually plug this in to the DataTask.



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -53,6 +53,8 @@ private MetadataColumns() {
   public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
   public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;
   public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
+  public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 104;

Review Comment:
   I guess the only thing I can think to do is to provide a shared implementation of positionDeleteReader in data module, but each engine still needs to check the type of table (or the marker FileScanTask) and then manually plug this in to the DataTask, for the positionDeletes table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r925096463


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Yea I think you are right about that.  Anyway this may change if we go with DataTask or some related approach



##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table) {
+    super(ops, table, table.name() + ".position_deletes");
+  }
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {
+    return DeleteSchemaUtil.metadataTableSchema(table());
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return table().spec();

Review Comment:
   Yea I think you are right about that.  Anyway I guess it may change if we go with DataTask or some related approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r893716028


##########
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import org.apache.iceberg.io.DeleteSchemaUtil;
+
+public class PositionDeletesTable extends BaseMetadataTable {
+
+  PositionDeletesTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.POSITION_DELETES;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new PositionDeletesTableScan(operations(), table(), schema());
+  }
+
+  @Override
+  public Schema schema() {

Review Comment:
   @aokolnychyi i added a test to verify all the metadata columns like _file_path work as expected.  So I would say, users can join position_delete._file_path column with delete_files table to get snapshot-id, sequence-number as they wish.  
   
   Though filtering pushdown will be trickier for this.  This is because FileScanTask interface as is does not contain sequence number, snapshot-id, and we would need to first subclass it again to contain this information as in the first version of the pr.  And potentially add a metadata column if so to expose it (see below).  That may be ok as I am not sure if we need to really optimize this more complex query?
   
   On the other hand, I could perhaps look at adding a partition column for partition filtering.  This would again probably be a MetadataColumns as it seems the only supported way to pass extra columns to RowDataReader.  FileScanTask's DataFile already contains this so it may work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1151344028

   Addressed the review comments.  As mentioned in second comment , adding some filter columns like partition may require a bit of discussion on the implementation, may add this here or in a second pr.  But in any case, this should be ready as is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1147871867

   > I thought this would be useful as we are lacking any way for user to see contents of delete files easily. 
   
   Thanks for confirming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #4812: Spark 3.2: Support reading position deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#discussion_r940562715


##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -89,8 +92,15 @@ private MetadataColumns() {}
           ROW_POSITION.fieldId(),
           IS_DELETED.fieldId(),
           SPEC_ID.fieldId(),
-          PARTITION_COLUMN_ID);
+          PARTITION_COLUMN_ID,

Review Comment:
   This is to avoid an issue when trying to read these constant columns via Avro reader, which supports but only if they are defined here..
   
   ```
   Caused by: java.lang.IllegalArgumentException: Missing required field: partition
   	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:104)
   	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:42)
   	at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:50)
   	at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:126)
   	at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:68)
   	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:133)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:139)
   	at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:131)
   	at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:75)
   	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
   	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
   	at org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:110)
   	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:61)
   	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:38)
   	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:128)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
   	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #4812: Spark 3.2: Support reading position deletes

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #4812:
URL: https://github.com/apache/iceberg/pull/4812#issuecomment-1412455363

   Closing as it's now broken into smaller prs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org