You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/24 11:51:18 UTC

[GitHub] [iceberg] chenjunjiedada opened a new pull request #2372: Spark: add position delete row reader

chenjunjiedada opened a new pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372


   This adds spark reader to read position delete rows. It also changes the predicate logic of the delete filter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r636510383



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,29 @@ public Schema requiredSchema() {
     return requiredSchema;
   }
 
+  protected int deleteMarkerIndex() {
+    if (deleteMarkerIndex != null) {
+      return deleteMarkerIndex;
+    }
+
+    int index = 0;
+    for (Types.NestedField field : requiredSchema().columns()) {
+      if (field.fieldId() != MetadataColumns.DELETE_MARK.fieldId()) {
+        index = index + 1;

Review comment:
       this feels a bit fragile to me, looks like it assumes the delete marker is the last metadata column, but if we in the future add another column it would break.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r600812746



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -203,6 +206,10 @@ protected PositionFilterIterator(CloseableIterator<T> items, CloseableIterator<L
 
       @Override
       protected boolean shouldKeep(T row) {
+        return keepDeleteRows != filter(row);
+      }
+
+      private boolean filter(T row) {

Review comment:
       I think it would be better to have a subclass of this one that overrides `shouldKeep` instead:
   
   ```java
   class PositionStreamDeletedRowSelector extends PositionStreamDeleteFilter {
     ...
     @Override
     protected boolean shouldKeep(T row) {
       return !super.shouldKeep(row);
     }
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r607683442



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -233,6 +246,37 @@ public void close() {
     }
   }
 
+  static class PositionStreamDeletedRowMarker<T> extends PositionStreamDeleteFilter<T> {
+    private final Consumer<T> deleteMarker;
+
+    private PositionStreamDeletedRowMarker(CloseableIterable<T> rows, Function<T, Long> extractPos,
+                                           CloseableIterable<Long> deletePositions,
+                                           Consumer<T> deleteMarker) {
+      super(rows, extractPos, deletePositions);
+      this.deleteMarker = deleteMarker;
+    }
+
+    @Override
+    protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
+                                                 CloseableIterator<Long> deletePositions) {
+      return new PositionMarkerIterator(items, deletePositions);
+    }
+
+    private class PositionMarkerIterator extends PositionFilterIterator {
+      protected PositionMarkerIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
+        super(items, deletePositions);
+      }
+
+      @Override
+      protected boolean shouldKeep(T row) {
+        if (!super.shouldKeep(row)) {
+          deleteMarker.accept(row);

Review comment:
       Personally,  it's really strange that we will modify the original row (set `_deleted` flag to be true) in a `shouldKeep` method because it should be a pure test method and should not modify certain states of the record. Otherwise, it is easy to cause confusion: after a record has undergone a different sequence of shouldKeep tests, the final result is different, and even the original record is modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-892750271


   @openinx @rdblue Could you please take another look when you free?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-889841931


   @openinx I updated the PR according to your comments in #2364. This is an independent one that not depends on others.  I think we can handle this one first.
   
   cc @rdblue @jackye1995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #2372: Spark: update delete row reader to read position deletes

Posted by GitBox <gi...@apache.org>.
flyrain commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-1045389645


   Hi @chenjunjiedada, are we still pursuing this? Is there an ETA to share?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r607650826



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +158,142 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
-    // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
+          Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
+          if (isDeletedFromEqDeletes.test(record)) {
+            deleteMarker().accept(record);
+          }
+          return record;
+        });
+    }
+
+    Filter<T> deletedRowsSelector = new Filter<T>() {

Review comment:
       For my understanding,  we have three kinds of delete reader: 
   
   1. `keepRowsFromDeletes`  will return the iterator that rows has been deleted by both eq-deletes & pos-deletes;  
   
   2. `keepRowsFromEqualityDeletes` will return the iterator that rows has been deleted by equality-deletes only;
   
   3. `keepRowsFromPosDeletes` will return the iterator that rows has been deleted by pos-deletes only. 
   
   The difference is:  we provide different ways to generate the iterator that produce the mixed deletions and rows,  finally those three kinds of `Iterable` should be all filtered by the same  `deletedRowsSelector`.  So I think we may could share the same  `deletedRowsSelector`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603699830



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -113,6 +113,12 @@ public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDelete
     return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
   }
 
+  public static <T> CloseableIterable<T> streamingSelector(CloseableIterable<T> rows,

Review comment:
       This name doesn't look quite clear enough. How about `streamingDeletedRowFilter`? I think that's clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r636510383



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,29 @@ public Schema requiredSchema() {
     return requiredSchema;
   }
 
+  protected int deleteMarkerIndex() {
+    if (deleteMarkerIndex != null) {
+      return deleteMarkerIndex;
+    }
+
+    int index = 0;
+    for (Types.NestedField field : requiredSchema().columns()) {
+      if (field.fieldId() != MetadataColumns.DELETE_MARK.fieldId()) {
+        index = index + 1;

Review comment:
       this feels a bit fragile to me, looks like it assumes the delete marker is the last metadata column, but if we in the future add another column it would break.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-811131816


   @rdblue @openinx , I think the goal here is to provide more fine-grained compaction actions. Let me show more background.
   
   We have many internal flink jobs that consume dozens of billions of messages from the MQ system and sink to the iceberg every day. Since the user wants to see data ASAP so they usually set checkpoint in a minute or less. As a result, it produces a huge amount of small files on HDFS. To optimize the read performance, we have to compact or cluster the small files while compaction or clustering itself needs resources and brings overhead for the cluster. To mitigate overhead for the name node and save the resource for the user, we optimized the compaction action to fine-grained actions with predicate and group by partition.
   
   As we are going to support consuming CDC streaming data, I suppose there will be a lot of equality deletes and position deletes files. So we need more fine-grained actions to optimize the read path like what we did for data file compaction. Actually, we have four kinds of compaction for deletes.
   
   1. Convert all equality deletes to position deletes.
   2. Cluster all position deletes to one.
   3. Convert all equality deletes and position deletes to one position deletes.
   4. Remove all deletes.
   
   From my understanding, the first three compactions are minor compaction, and the last is a major one. The first and second compaction only need a few compute and IO resources, and they can also achieve the almost same optimization effect if we run the first and then the second. Of course we could implement the third finally as well.  The point is we want to provide fine-grained options to users and they could apply strategies according to the cluster situations. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603699830



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -113,6 +113,12 @@ public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDelete
     return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
   }
 
+  public static <T> CloseableIterable<T> streamingSelector(CloseableIterable<T> rows,

Review comment:
       This name doesn't look quite clear enough. How about `streamingDeletedRowSelector`? I think that's clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603701356



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,39 +139,80 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    return CloseableIterable.concat(Lists.newArrayList(keepRowsFromPosDeletes(records),
+        keepRowsFromEqualityDeletes(records)));
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> predicate = buildEqDeletePredicate();
+    if (predicate == null) {
+      return CloseableIterable.empty();
+    }
 
     Filter<T> deletedRowsFilter = new Filter<T>() {
       @Override
       protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+        return predicate.test(item);
       }
     };
     return deletedRowsFilter.filter(records);
   }
 
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      // Predicate to test whether a row has been deleted by equality deletions.
+      Predicate<T> predicate = buildPosDeletePredicate();
+      if (predicate == null) {
+        return CloseableIterable.empty();
+      }
+
+      Filter<T> deletedRowsFilter = new Filter<T>() {
+        @Override
+        protected boolean shouldKeep(T item) {
+          return predicate.test(item);
+        }
+      };
+      return deletedRowsFilter.filter(records);
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      return Deletes.streamingSelector(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
+    }
+  }
+
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row should be visible to user after applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
+    Predicate<T> predicate = buildEqDeletePredicate();

Review comment:
       I think it would be more clear if this were named `isDeleted`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-809846246


   Thanks for the review and comments!
   
   The original thought is to handle equality delete and position delete respectively,  which I called a different level of minor compactions. The separate compactions allow users to control the file scan more fine-grained, so as to mitigate overhead to name node. For example, users could monitor the number of equality deletes and position deletes from the snapshot summary and performs a spark or flink action to do the specific compaction.
   
   I didn't consider reading all deleted row because I thought it is major compaction and it may similar to the action remove all deletes. If we want to support one more level compaction which read all deletes and rewrite them to position deletes I think your suggestion definitely works.
   
   So I think it would be better to remove the logic of reading all deleted rows in this PR, and use the suggested way to implement it and also add an action for it. While I'd like to keep the current separate compaction actions for the fine-grained usage. Does that make sense to you?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r600807180



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java
##########
@@ -31,13 +32,16 @@
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
-public class EqualityDeleteRowReader extends RowDataReader {
+public class DeleteRowReader extends RowDataReader {
   private final Schema expectedSchema;
+  private final FileContent deleteContent;
 
-  public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
-                                 FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+  public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+                         FileIO io, EncryptionManager encryptionManager, boolean caseSensitive,
+                         FileContent deleteContent) {

Review comment:
       There is no guarantee that the `CombinedScanTask` doesn't have both position and equality deletes to apply, so it doesn't make sense to add this argument and handle just one. I think that this reader should return all deleted rows from a file, no matter which kind of delete was encoded. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603698456



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -233,6 +244,30 @@ public void close() {
     }
   }
 
+  static class PositionStreamDeletedRowSelector<T> extends PositionStreamDeleteFilter<T> {
+    private PositionStreamDeletedRowSelector(CloseableIterable<T> rows, Function<T, Long> extractPos,
+                                             CloseableIterable<Long> deletePositions) {
+      super(rows, extractPos, deletePositions);
+    }
+
+    @Override
+    protected FilterIterator<T> getPositionIterator(CloseableIterator<T> items,

Review comment:
       Iceberg's style is to omit `get` from method names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-809988883


   As producing records for streaming (both inserted and deleted),  I'm not quite sure whether will it work because people usually consume delta files between two snapshots incrementally.   The equality deletes from delta files will need to be applied to the downstream consumer firstly because they are deleting the records that has been committed in the previous txn, while pos-deletes are deleting the records committed in the current txn. Applying the row marked _is_deleted directly to the downstream table may cause the upstream's pos-delete to delete data that should not be deleted in downstream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-810092443


   > Instead, I think that the way to do this is to select all rows and set a metadata column to indicate whether or not the row is deleted.
   
   I've tried to think about how to add the `_is_deleted` metadata column for each record read from Parquet/Orc Readers. The workflow should be: 
   
   1.   Add a boolean reader at the tail when constructing the Parquet/Orc Readers for the given iceberg schema,  the boolean reader will just fill a default value `false` for each record.   The real value will be filled after checked the equality delete files & pos delete files iteratively; 
   2.  The struct parquet/orc reader read the whole row,  now the `_is_deleted` value is `false` by default; 
   3.  Check the equality delete files and position delete files.  set the `_is_deleted` to be `true` if the row has been deleted successfully.  it will require the flink RowData & spark InternalRow provide `setValue(pos, value)` interface to update the real value of `_is_deleted`.
   4.  Return `Iterable<Row>`.
   
   The most complicated work occurs in the third step,  because we will need to refactor all the `Deletes#filter` path to return a boolean flag for a row , rather than just returning the filtered `iterable<T>`.  This will mean that we have almost refactored the logic related to delete filter. Now I am a little hesitant whether it is necessary to do this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-892750271


   @openinx @rdblue Could you please take another look when you free?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-813129332


   Just build a parquet implementation on top of the metadata column method. @rdblue @openinx , you might want to take a look.  I will try to build metadata column in ORC and Avro.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r601025876



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java
##########
@@ -31,13 +32,16 @@
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
-public class EqualityDeleteRowReader extends RowDataReader {
+public class DeleteRowReader extends RowDataReader {
   private final Schema expectedSchema;
+  private final FileContent deleteContent;
 
-  public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
-                                 FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+  public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+                         FileIO io, EncryptionManager encryptionManager, boolean caseSensitive,
+                         FileContent deleteContent) {

Review comment:
       Yes, there is no guarantee about what kind of deletes a `CombinedScanTask` contains. Here I'd like to expose the option to the user to select one kind of delete to rewrite. How about returns all deleted rows when `deleteContent` is not passed? That should be a valid option I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-881112016


   I rebased this upon the IS_DELETED metadata column change. This is now ready to review, @rdblue @jackye1995 @openinx.
   
   @jackye1995 I'm working on the  API and action to accept different rewrite strategies, but this should be used anyway. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603701182



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,39 +139,80 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    return CloseableIterable.concat(Lists.newArrayList(keepRowsFromPosDeletes(records),
+        keepRowsFromEqualityDeletes(records)));
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> predicate = buildEqDeletePredicate();
+    if (predicate == null) {
+      return CloseableIterable.empty();
+    }
 
     Filter<T> deletedRowsFilter = new Filter<T>() {
       @Override
       protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+        return predicate.test(item);
       }
     };
     return deletedRowsFilter.filter(records);
   }
 
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {

Review comment:
       Is there a need to make these methods public? Or will rows only be read using `keepRowsFromDeletes`? What is the use case for these changes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada closed pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada closed pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-810716515


   > because we don't need to do this unless we are projecting deleted rows
   
   I think we need to figure out what's the specific implementation approach, will try to publish a PR for this if possible.
   
   > can you confirm why you wanted to be able to read equality-deleted rows like this?
   
   I think the core reason is:  it's simple to implement.  The current approach only need to translate all the eq-deletes into pos-deletes, without considering the duplicated pos-deletes.  We cloud also share most of the code path when planing tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-809976337


   In my original mind,  there are two kinds of compaction:  
   
   a.  convert all equality deletes into position deletes.  As whether should we eliminate the duplicate position deletes at the same time, the difference for me is:  if the duplicate pos-deletes is removed during rewrite, the user's reading efficiency will be higher; if not, the reading efficiency will be worse. Generally speaking, I think it is a trade-off problem in performance optimization.  Both of them seems to be acceptable to me. 
   
   b. Eliminate all deletes (include pos-deletes and equality-deletes). It is very suitable for the situation where delete has a high proportion in the whole table.  On the one hand, we can save a lot of unnecessary storage, and on the other hand, we can avoid a lot of inefficient joins when reading data.  [This](https://github.com/apache/iceberg/pull/2303/files#diff-605d0d98a73f67629cddbceb9a566e8655844a3cdf46b4dbcebd0e19102e82b4R128) is more simpler to implement compared to the case.a. 
   
   After reading @rdblue 's [comment](https://github.com/apache/iceberg/pull/2372#issuecomment-809823407) , what makes me feel the most valuable is:  we can use the abstraction of meta-column to achieve code unification of case.a, case.b, and the normal read path.  Saying if we have an `iterable=Iterable<Row>` with `_is_deleted` flag inside each row: 
   
   For case.a,   we could just use `Iterables.transform(Iterables.filter(iterable, row -> row.isDeleted()), row -> (row.file(), row.pos()))`  to generate all the pos-deletes.
   
   For case.b,  we could just use `Iterables.filter(iterable, row -> !row.isDeleted())` to get all remaining rows.
   
   For the normal read path, it's same to the case.b.
   
   This implementation greatly reduces the complexity of various paths, I think we can try this kind of code implementation.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-815037809


   @openinx, Thanks for the comments, I will address them ASAP. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-884613231


   @rdblue @openinx @jackye1995 Any other comments on this? This is need for the rewrite delete action.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-812240813


   @rblue, What do you think about these use cases? Should we continue on these minor compactions? I want to refactor them and implement flink action as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r608625922



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -233,6 +246,37 @@ public void close() {
     }
   }
 
+  static class PositionStreamDeletedRowMarker<T> extends PositionStreamDeleteFilter<T> {
+    private final Consumer<T> deleteMarker;
+
+    private PositionStreamDeletedRowMarker(CloseableIterable<T> rows, Function<T, Long> extractPos,
+                                           CloseableIterable<Long> deletePositions,
+                                           Consumer<T> deleteMarker) {
+      super(rows, extractPos, deletePositions);
+      this.deleteMarker = deleteMarker;
+    }
+
+    @Override
+    protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
+                                                 CloseableIterator<Long> deletePositions) {
+      return new PositionMarkerIterator(items, deletePositions);
+    }
+
+    private class PositionMarkerIterator extends PositionFilterIterator {
+      protected PositionMarkerIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {
+        super(items, deletePositions);
+      }
+
+      @Override
+      protected boolean shouldKeep(T row) {
+        if (!super.shouldKeep(row)) {
+          deleteMarker.accept(row);

Review comment:
       I'm trying to add a `DeleteMarker` to add the `_is_deleted` flag iteratively, pls see the PR https://github.com/apache/iceberg/pull/2434




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r607599844



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -233,6 +246,37 @@ public void close() {
     }
   }
 
+  static class PositionStreamDeletedRowMarker<T> extends PositionStreamDeleteFilter<T> {
+    private final Consumer<T> deleteMarker;
+
+    private PositionStreamDeletedRowMarker(CloseableIterable<T> rows, Function<T, Long> extractPos,
+                                           CloseableIterable<Long> deletePositions,
+                                           Consumer<T> deleteMarker) {
+      super(rows, extractPos, deletePositions);
+      this.deleteMarker = deleteMarker;
+    }
+
+    @Override
+    protected FilterIterator<T> positionIterator(CloseableIterator<T> items,
+                                                 CloseableIterator<Long> deletePositions) {
+      return new PositionMarkerIterator(items, deletePositions);
+    }
+
+    private class PositionMarkerIterator extends PositionFilterIterator {
+      protected PositionMarkerIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) {

Review comment:
       Nit:  this could be `package-private`.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,23 @@ public Schema requiredSchema() {
     return requiredSchema;
   }
 
+  protected int deleteMarkerIndex() {
+    int index = 0;
+    for (Types.NestedField field : requiredSchema().columns()) {
+      if (field.fieldId() != MetadataColumns.DELETE_MARK.fieldId()) {
+        index = index + 1;
+      } else {
+        break;
+      }
+    }
+
+    return index;
+  }
+
+  protected abstract Consumer<T> deleteMarker();
+
+  protected abstract Function<T, Boolean> deleteChecker();

Review comment:
       Checked all this usage, seems we don't have to return a `Function<T, Boolean>`,  it's more simpler to define a method like: 
   
   ```java
     protected abstract boolean isDeletedRow(T row);
   ```

##########
File path: data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
##########
@@ -40,6 +42,16 @@ protected long pos(Record record) {
     return (Long) posAccessor().get(record);
   }
 
+  @Override
+  protected Consumer<Record> deleteMarker() {
+    return record -> record.set(deleteMarkerIndex(), true);
+  }
+
+  @Override
+  protected Function<Record, Boolean> deleteChecker() {
+    return record -> record.get(deleteMarkerIndex(), Boolean.class);

Review comment:
       Could we use the lazy approach to access the index of `_deleted`,  this is a very hot code path because each record will need to search the deleted column index.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +158,142 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
-    // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
+          Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
+          if (isDeletedFromEqDeletes.test(record)) {

Review comment:
       If the row has been marked as `deleted` in the `streamingDeletedRowMarker`,  then I think we don't have to check this row again for equality delete files.  The double-check will effect the read performance a lot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-810458594


   > The most complicated work occurs in the third step, because we will need to refactor all the `Deletes#filter` path to return a boolean flag for a row, rather than just returning the filtered `Iterable<T>`. This will mean that we have almost refactored the logic related to delete filter. Now I am a little hesitant whether it is necessary to do this.
   
   I think we can leave most of the existing read path as-is because we don't need to do this unless we are projecting deleted rows, but I agree with you that this would be a bit more work. That's why we need to consider the purpose of this change. It sounds like the goal is to rewrite equality deletes as position deletes, but it isn't quite clear. @chenjunjiedada or @openinx, can you confirm why you wanted to be able to read equality-deleted rows like this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603697954



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -113,6 +113,12 @@ public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDelete
     return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
   }
 
+  public static <T> CloseableIterable<T> streamingSelector(CloseableIterable<T> rows,
+                                                         Function<T, Long> rowToPosition,
+                                                         CloseableIterable<Long> posDeletes) {

Review comment:
       Nit: indentation is off.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603700908



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,39 +139,80 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    return CloseableIterable.concat(Lists.newArrayList(keepRowsFromPosDeletes(records),
+        keepRowsFromEqualityDeletes(records)));

Review comment:
       This isn't correct because a row may be deleted by both position and equality deletes. If that happened, then this would return the same row twice. I think this needs to be implemented so that the deleted rows are returned just once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-845514501


   Finally get some time to catch up with all the delete works. In general I agree the delete marker sounds like the right way to go forward. Regarding the 4 situations that Junjie described for his use cases, which are:
   
   1. Convert all equality deletes to position deletes.
   2. Cluster all position deletes to one.
   3. Convert all equality deletes and position deletes to one position deletes.
   4. Remove all deletes.
   
   However, these are based on the assumption that:
   
   1. we should always move files from equality deletes to position deletes to data files
   2. we should have as few delete files as possible
   
   Which are not 100% true in all situations. For example against 1, if we have tables that are well partitioned and sorted, and deletes are issued based on those partition and sort columns, then equality delete actually can consume way less memory and also perform better. For example against 2, having 1 single delete file means it has to be included in every single FileScanTask that might be executed by different workers and cannot share any cache, whereas if we have splitted those delete files, much fewer rows in delete files have to be read in each task. This also removed bottleneck of reading a single file with high parallelism which causes throttling in cloud storages.
   
   For major compaction, I think there is no doubt, it's the removal of all delete files, and the RewriteDataFiles work that Russell is doing should cover major compaction use case.
   
   But I feel everyone has a somewhat similar but different definition for minor compaction. I totally agree with Junjie that we should allow fine grained control for people to run a flexible set of actions based on the use case, and here is the definition in my mind:
   
   Major compaction: an action that takes all files in a snapshot and produces only data files
   Minor compaction: an action that takes all files in a snapshot and produces only delete files that are applied on top of the existing data files
   
   It seems to me that we should add an action similar to `RewriteDataFiles` and make another action framework, and we can implement different strategies for that action to fulfill different use cases described. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r600809072



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java
##########
@@ -52,6 +56,10 @@ public EqualityDeleteRowReader(CombinedScanTask task, Schema schema, Schema expe
     // update the current file for Spark's filename() function
     InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
 
-    return matches.findEqualityDeleteRows(open(task, requiredSchema, idToConstant)).iterator();
+    if (deleteContent.equals(FileContent.EQUALITY_DELETES)) {

Review comment:
       I think that this should use a combined `keepDeletedRows` method instead of methods specific to equality or position deletes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-817335769


   @openinx I didn't change the DeleteMarker since you have one. Thanks for the PR!
   
   @rdblue, Would you please take another look? I changed to use the metadata column to implement this. This now contains several logics, I 'd like to separate them if it goes right direction. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-809823407


   @chenjunjiedada, I started reviewing this again, but I think we should reconsider the direction that this is taking.
   
   My initial review comments were based on this change in isolation, which left out position deletes. Adding position deletes is harder because you can't union the rows that are deleted by position with the rows deleted by equality because a row may have been deleted by both if a position delete is encoded, followed by an equality delete that applies to the same data file. You could update this to avoid the duplicates, but I think that would result in substantial changes and doesn't actually get us closer to what you're trying to do.
   
   If I understand correctly, what you're trying to do is to create a Spark `DataFrame` of deleted rows. That way, you could use Spark to project `_file` and `_pos`, sort it by those fields, and then write the position delete files from the resulting `DataFrame`. That's probably why you didn't consider position-based deletes in the initial PR. Is this correct?
   
   If so, I think that the approach should be slightly different. Updating the filter supports the original goal of rewriting equality deletes, but is strangely specific and doesn't easily support other uses. Instead, I think that the way to do this is to select _all_ rows and set a metadata column to indicate whether or not the row is deleted. That's an easy way to guarantee that the deleted rows are returned just once because every row is returned once. The filtering may set the same "_is_deleted" field on the record but that's okay. Then we can use the resulting DataFrame for more operations, like inspecting row-level deletes or producing records for streaming (both inserted and deleted).
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r605007622



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,39 +139,80 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    return CloseableIterable.concat(Lists.newArrayList(keepRowsFromPosDeletes(records),
+        keepRowsFromEqualityDeletes(records)));
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> predicate = buildEqDeletePredicate();
+    if (predicate == null) {
+      return CloseableIterable.empty();
+    }
 
     Filter<T> deletedRowsFilter = new Filter<T>() {
       @Override
       protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+        return predicate.test(item);
       }
     };
     return deletedRowsFilter.filter(records);
   }
 
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {

Review comment:
       They are used in `DeleteRowReader` which is in spark module.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r600813602



##########
File path: core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
##########
@@ -192,7 +192,8 @@ public void testCombinedPositionStreamRowFilter() {
     CloseableIterable<StructLike> actual = Deletes.streamingFilter(
         rows,
         row -> row.get(0, Long.class),
-        Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)));
+        Deletes.deletePositions("file_a.avro", ImmutableList.of(positionDeletes1, positionDeletes2)),
+        false);

Review comment:
       I don't think this file needs to change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603700530



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -112,11 +112,11 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private List<Predicate<T>> applyEqDeletes() {
-    List<Predicate<T>> isInDeleteSets = Lists.newArrayList();
+  private Predicate<T> buildEqDeletePredicate() {
     if (eqDeletes.isEmpty()) {
-      return isInDeleteSets;
+      return null;
     }
+    Predicate<T> isDeleted = t -> false;

Review comment:
       I think this should be initialized to `null` instead of a predicate. There is no need to run an extra predicate (with an extra method dispatch for each row in a data file. That's a tight loop so we should do more work here to avoid it. Instead of using `isDeleted.or`, this should test whether `isDeleted` is `null` and either initialize `isDeleted` or call `isDeleted.or`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-812242732


   @chenjunjiedada, the different types of actions make sense to me. What I'm asking is which one you are currently trying to build. I think it is 1, which makes sense and is what I assumed from looking at what you're doing. But it would be great to get an idea of how you plan to build that compaction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603699094



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -158,7 +164,7 @@ protected boolean shouldKeep(T row) {
     }
   }
 
-  private static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {
+  protected static class PositionStreamDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> {

Review comment:
       I think this can still be private because the subclass is also defined in this file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r603698874



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -191,7 +197,12 @@ private PositionStreamDeleteFilter(CloseableIterable<T> rows, Function<T, Long>
       return iter;
     }
 
-    private class PositionFilterIterator extends FilterIterator<T> {
+    protected FilterIterator<T> getPositionIterator(CloseableIterator<T> items,
+                                                    CloseableIterator<Long> newDeletePositions) {
+      return new PositionFilterIterator(items, newDeletePositions);
+    }
+
+    protected class PositionFilterIterator extends FilterIterator<T> {

Review comment:
       Does this need to be protected? It looks like it could be private and still used. Or, it could be protected and the `getPositionIterator` method could be removed. I think I prefer the second option because it would require no change to the `iterator` method in this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-967749772


   @rdblue @jackye1995 @aokolnychyi @RussellSpitzer,  The API changes are ready, now this is needed for deletes conversion. Could you please take another look if have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-845939540


   @jackye1995 , The use cases you mentioned make sense to me. Really good points! I agree that we should define a `RewriteDeletes` framework to allow users to perform different strategies according to their table definitions and data status. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada closed pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada closed pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#discussion_r600813228



##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -109,8 +109,9 @@ public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDelete
 
   public static <T> CloseableIterable<T> streamingFilter(CloseableIterable<T> rows,
                                                          Function<T, Long> rowToPosition,
-                                                         CloseableIterable<Long> posDeletes) {
-    return new PositionStreamDeleteFilter<>(rows, rowToPosition, posDeletes);
+                                                         CloseableIterable<Long> posDeletes,
+                                                         boolean keepDeleteRows) {

Review comment:
       I would rather not change this method. Instead, let's introduce a new method to select deleted rows instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-824467832


   cc @aokolnychyi @RussellSpitzer as well. You guys may want to take a look at the delete file as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2372: Spark: add position delete row reader

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2372:
URL: https://github.com/apache/iceberg/pull/2372#issuecomment-819158268


   @rdblue, The way that using metadata column looks promising. It simplifies the total delete filter logic a bit. I'm going to separate this PR. What do you think about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org