You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/07 15:02:22 UTC

[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589042721



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.  For a filter chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` N times
   - `F2` (N-N1) times
   - `F3` (N-N1-N2) times
   - `F4` (N-N1-N2-N3) times
   
   For a matching chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` 2N times (filter and match)
   - `F2` 2(N-N1) times (filter and match)
   - `F3` 2(N-N1-N2) times (filter and match)
   - `F4` 2(N-N1-N2-N3) times (filter and match)
   
   @rdblue Could you please help to correct me if I am wrong?
   
   Here is an alternative implementation that collects all delete sets in a list and does the projection in the filter. It looks a bit straightforward. 
   
   ```java
     public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
                                                  BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
                                                  List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
       if (unprojectedDeleteSets.isEmpty()) {
         return rows;
       }
   
       EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
       return equalityFilter.filter(rows);
     }
   
   
     private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
       private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
       private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
   
       protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
                                         List<Pair<StructProjection, StructLikeSet>> deleteSets) {
         this.extractEqStruct = extractEq;
         this.deleteSets = deleteSets;
       }
   
       @Override
       protected boolean shouldKeep(T row) {
         for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
           if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
             return true;
           }
         }
   
         return false;
       }
     }
   
   ```
   
   PS: For the delete files with the same equality field IDs we will collect the deletes in one set. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org