You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/17 06:31:22 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4683: Read deleted rows with metadata column IS_DELETED

aokolnychyi commented on code in PR #4683:
URL: https://github.com/apache/iceberg/pull/4683#discussion_r874393713


##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -63,14 +65,14 @@ public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Functio
     return equalityFilter.filter(rows);
   }
 
-  public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Function<T, Long> rowToPosition,
-                                                PositionDeleteIndex deleteSet) {
-    if (deleteSet.isEmpty()) {
-      return rows;
-    }
-
-    PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet);
-    return filter.filter(rows);
+  public static <T> CloseableIterable<T> markDeleted(CloseableIterable<T> rows, Predicate<T> isDeleted,
+                                                     Consumer<T> deleteMarker) {
+    return CloseableIterable.transform(rows, row -> {
+      if (isDeleted.test(row)) {

Review Comment:
   I wonder whether we should be worried about evaluating an extra if condition for every single row and whether there is a way to rewrite it. Let's ignore this for now as there are quite a few ifs already.



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -226,10 +226,15 @@ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
 
     // if there are fewer deletes than a reasonable number to keep in memory, use a set
     if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
-      return Deletes.filter(records, this::pos, Deletes.toPositionIndex(filePath, deletes));
+      Predicate<T> isInDeleteSet = record -> Deletes.toPositionIndex(filePath, deletes).isDeleted(pos(record));

Review Comment:
   Won't we call `Deletes$toPositionIndex` for every row now?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -290,8 +295,6 @@ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
       requiredIds.addAll(eqDelete.equalityFieldIds());
     }
 
-    requiredIds.add(MetadataColumns.IS_DELETED.fieldId());

Review Comment:
   Could you comment on why this was removed?



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java:
##########
@@ -104,7 +103,6 @@ public class TestSparkParquetReadMetadataColumns {
       }
       row.update(1, UTF8String.fromString("str" + i));
       row.update(2, i);
-      row.update(3, false);

Review Comment:
   Why do we need these changes?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSc
     this.eqDeletes = eqDeleteBuilder.build();
     this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
     this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    this.hasColumnIsDeleted = requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+    this.columnIsDeletedPosition = requestedSchema.columns().indexOf(MetadataColumns.IS_DELETED);

Review Comment:
   Should we use `requiredSchema` or `requestedSchema`?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSc
     this.eqDeletes = eqDeleteBuilder.build();
     this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
     this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    this.hasColumnIsDeleted = requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
+    this.columnIsDeletedPosition = requestedSchema.columns().indexOf(MetadataColumns.IS_DELETED);

Review Comment:
   This assumes query engines should be able to set a value in a row for a particular position. Seems reasonable but let us also think about alternatives.



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -169,30 +176,23 @@ public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records)
         .reduce(Predicate::or)
         .orElse(t -> false);
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
-      }
-    };
-    return deletedRowsFilter.filter(records);
+    return CloseableIterable.filter(records, deletedRows);
   }
 
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
-    // Predicate to test whether a row should be visible to user after applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
-
-    Filter<T> remainingRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return remainingRows.test(item);
-      }
-    };
+    Predicate<T> isEqDeleted = applyEqDeletes().stream()
+        .reduce(Predicate::or)
+        .orElse(t -> false);
 
-    return remainingRowsFilter.filter(records);
+    if (hasColumnIsDeleted) {
+      return Deletes.markDeleted(records, isEqDeleted, this::markRowDeleted);
+    } else {
+      return CloseableIterable.filter(records, isEqDeleted.negate());
+    }
+  }
+
+  protected void markRowDeleted(T item) {
+    throw new UnsupportedOperationException("GenericDeleteFilter.markRowDeleted() is not supported");

Review Comment:
   What about `this.getClass().getName() + " does not implement markRowDeleted"` to cover all potential implementations?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org