You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/17 22:33:35 UTC

[GitHub] [iceberg] rdblue opened a new pull request #1352: Support row-level deletes with IcebergGenerics

rdblue opened a new pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352


   This adds support for applying row-level deletes when reading with IcebergGenerics.
   
   This also refactors the classes used by the `IcebergGenerics` reader so that generics can be used more easily in other applications. It adds `GenericReader` that will open tasks (either file tasks or combined tasks) and will automatically build a row-based `CloseableIterator<Record>` that removes deleted rows and applies task residual filters.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475141330



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -78,6 +82,10 @@
     this.sortedDeletesByPartition = sortedDeletesByPartition;
   }
 
+  public boolean isEmpty() {
+    return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();

Review comment:
       The reason why I didn't is that this constructor is not public. It will only be called by tests and the builder, so I think we can be more relaxed about validation. We know that this is never null because the builder never calls it that way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475749255



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -114,14 +117,40 @@
     return records;
   }
 
-  private Schema fileProjection(boolean hasPosDeletes) {
-    if (hasPosDeletes) {
-      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);

Review comment:
       Is it a necessary constraint? As long as old data files have the deleted columns, we can still apply the deletes. Maybe we just need to change how we build this projection schema. We could base it on the data file's schema instead of the table schema. If the column is in the data file schema, it would work fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#issuecomment-679362503


   Thanks for the reviews, everyone! I'll merge this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475153850



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {

Review comment:
       I updated this to create the correct projection when columns are top-level.
   
   Creating the right projection when columns are nested is going to require a visitor, so I think it should be done in a different commit and have tests. I've added a TODO note to fix this. Until it is done, if a column is not top level this will throw an exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472248819



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {

Review comment:
       Is this different than just lexicographically comparing the bytes of the path with the byte buffer? Just wondering if converting the path to bytes may be cheaper than than buffer to string. Not a big deal though and probably has no real perf impacts at this location.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472355340



##########
File path: api/src/main/java/org/apache/iceberg/data/Record.java
##########
@@ -35,4 +36,26 @@
   Record copy();
 
   Record copy(Map<String, Object> overwriteValues);
+
+  default Record copy(String field, Object value) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);
+    overwriteValues.put(field, value);
+    return copy(overwriteValues);
+  }
+
+  default Record copy(String field1, Object value1, String field2, Object value2) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);

Review comment:
       Yeah, I'll update this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#issuecomment-678895043


   LGTM! There still seem to be some checkstyle failures. Can you have another look? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472257739



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {
+      return false;
     }
+
+    ByteBuffer upper = uppers.get(pathId);
+    if (upper != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static boolean canContainEqDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {

Review comment:
       This is a rather complicated function and It would be great help if we had a few comments about the kind of checks we are doing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475154152



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;

Review comment:
       I think this is okay for now, but we should do the truncation for Spark. All we should need to do is truncate the reported size, since the read schema we produce will put additional columns at the end.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475156761



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();

Review comment:
       One minor thing, the variable name `orderedIds` seems a little bit confused to me, do you mean the IDs are aligned with IDs ordered defined in the schema?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r473195684



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;

Review comment:
       I would like to avoid adding a projection, but I agree that it is a little strange to return a row that reports more columns. I think this will come down to the object model. For example, it is perfectly fine to return a row in Spark that has an additional field, and it is much more efficient not to copy the data into a different row.
   
   For generics, I think it is okay to return a row with an extra value, but we might want to make it possible to truncate `GenericRecord` to make the extra fields unavailable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475153967



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();

Review comment:
       I replaced `ProjectStructLike` with `StructProjection` that handles nested fields. It works as a wrapper like the previous version to avoid copying data.
   
   As a consequence, it cannot handle projections within map or list fields. But that's okay because equality predicates can't be applied to map or list fields anyway. It will throw an exception if it is used to project types that are not supported, so it should be safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r473202172



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();

Review comment:
       I intended to support nested fields, but I think there is still a problem here.
   
   For example, schema `1: id bigint, 2: location struct<3: city, 4: postcode>` and delete `postcode='HG1 2AD'`. The delete id set is `{4}`, but the delete schema from the previous line is `2: location struct<4: postcode>`. The projection created from `orderedIds` should use ID 2 because that's the top-level field that needs to be pulled from a full record.
   
   The problem is that the nested struct also needs a projection, which I missed. I think we probably need to build the projection using a visitor and correctly wrap nested structs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475325724



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -114,14 +117,40 @@
     return records;
   }
 
-  private Schema fileProjection(boolean hasPosDeletes) {
-    if (hasPosDeletes) {
-      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);

Review comment:
       Interesting constraint here that a column cannot be deleted if there are live equality delete files depending on the column. Maybe we should be checking this while deleting columns?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472865157



##########
File path: core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
##########
@@ -73,11 +73,6 @@ public boolean equals(Object other) {
       return false;
     }
 
-    int len = struct.size();
-    if (len != that.struct.size()) {
-      return false;
-    }
-

Review comment:
       Was this removed to ignore the extra columns coming from the file projection?

##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();

Review comment:
       Does this mean equality deletes can only currently support deletes on top level fields?

##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;

Review comment:
       The schema returned Record here will also include metadata columns and/or columns for equality deletes on top of the schema requested by the user. Do we want to apply a projection on top to match the schema requested?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475385355



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.parquet.Preconditions;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema tableSchema;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.tableSchema = scan.table().schema();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(posDeletes, eqDeletes);
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
+
+      columns.add(field);
+    }
+
+    if (requiredIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
+      columns.add(MetadataColumns.ROW_POSITION);
+    }
+
+    return new Schema(columns);
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+
+      // a wrapper to translate from generic objects to internal representations
+      InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(recordSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, dataFile, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      filteredRecords = Deletes.filter(filteredRecords,
+          record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
+    }
+
+    return filteredRecords;
+  }
+
+  private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                    CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
+    if (posDeletes.isEmpty()) {
+      return records;
+    }
+
+    Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
+    List<CloseableIterable<StructLike>> deletes = Lists.transform(posDeletes,
+        delete -> openPosDeletes(delete, dataFile));
+
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {
+      return Deletes.filter(records, posGetter, Deletes.toPositionSet(file, CloseableIterable.concat(deletes)));
+    }
+
+    return Deletes.streamingFilter(records, posGetter, Deletes.deletePositions(file, deletes));
+  }
+
+  private CloseableIterable<StructLike> openPosDeletes(DeleteFile file, DataFile dataFile) {
+    return openDeletes(file, dataFile, POS_DELETE_SCHEMA);
+  }
+
+  private <T> CloseableIterable<T> openDeletes(DeleteFile deleteFile, DataFile dataFile, Schema deleteSchema) {

Review comment:
       I saw the upper layer will need the `CloseableIterable<StructLike>` and `CloseableIteratable<Record>`,   How about marking the parameterized T as `T extends StructLike` here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r474989978



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;

Review comment:
       Actually the above is probably relevant for Generics too. The extra columns reported can change across multiple records within the same scan since each file can have different equality field ids.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r473191907



##########
File path: core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
##########
@@ -73,11 +73,6 @@ public boolean equals(Object other) {
       return false;
     }
 
-    int len = struct.size();
-    if (len != that.struct.size()) {
-      return false;
-    }
-

Review comment:
       Yes, it was. Since the wrapper's equality is based on the type, I think it doesn't make sense to check the size here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r474541698



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
+
+      // a wrapper to translate from generic objects to internal representations
+      InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      ProjectStructLike projectRow = ProjectStructLike.of(recordSchema, orderedIds);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, dataFile, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      filteredRecords = Deletes.filter(filteredRecords,
+          record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
+    }
+
+    return filteredRecords;
+  }
+
+  private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                    CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
+    if (posDeletes.isEmpty()) {
+      return records;
+    }
+
+    Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
+    List<CloseableIterable<StructLike>> deletes = Lists.transform(posDeletes,
+        delete -> openPosDeletes(delete, dataFile));
+
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {

Review comment:
       Can we make this configurable? For example, let users choose how much memory they want to use for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472217910



##########
File path: api/src/main/java/org/apache/iceberg/data/Record.java
##########
@@ -35,4 +36,26 @@
   Record copy();
 
   Record copy(Map<String, Object> overwriteValues);
+
+  default Record copy(String field, Object value) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);
+    overwriteValues.put(field, value);
+    return copy(overwriteValues);
+  }
+
+  default Record copy(String field1, Object value1, String field2, Object value2) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);

Review comment:
       Shouldn't this be 2?

##########
File path: api/src/main/java/org/apache/iceberg/data/Record.java
##########
@@ -35,4 +36,26 @@
   Record copy();
 
   Record copy(Map<String, Object> overwriteValues);
+
+  default Record copy(String field, Object value) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);
+    overwriteValues.put(field, value);
+    return copy(overwriteValues);
+  }
+
+  default Record copy(String field1, Object value1, String field2, Object value2) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);
+    overwriteValues.put(field1, value1);
+    overwriteValues.put(field2, value2);
+    return copy(overwriteValues);
+  }
+
+  default Record copy(String field1, Object value1, String field2, Object value2, String field3, Object value3) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);

Review comment:
       And this 3?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475156761



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();

Review comment:
       One minor thing, the variable name `orderedIds` seems a little bit confused to me, If it means IDs are aligned with IDs ordered defined in the schema, would it better to call it `alignedIds`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475325724



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -114,14 +117,40 @@
     return records;
   }
 
-  private Schema fileProjection(boolean hasPosDeletes) {
-    if (hasPosDeletes) {
-      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);

Review comment:
       Interesting constraint here that a column cannot be deleted if there are live equality delete files depending on the column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472850750



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {

Review comment:
       For equality deletes, wouldn't we need the file projection to also include equality field ids if not already present?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472238753



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -78,6 +82,10 @@
     this.sortedDeletesByPartition = sortedDeletesByPartition;
   }
 
+  public boolean isEmpty() {
+    return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();

Review comment:
       And possibly for specsByID? I'm not sure the project policy on null guarding




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r474809974



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {
+    if (hasPosDeletes) {
+      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+      columns.add(MetadataColumns.ROW_POSITION);
+      return new Schema(columns);
+    }
+
+    return projection;
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+      int[] orderedIds = deleteSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
+
+      // a wrapper to translate from generic objects to internal representations
+      InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      ProjectStructLike projectRow = ProjectStructLike.of(recordSchema, orderedIds);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, dataFile, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      filteredRecords = Deletes.filter(filteredRecords,
+          record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
+    }
+
+    return filteredRecords;
+  }
+
+  private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                    CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
+    if (posDeletes.isEmpty()) {
+      return records;
+    }
+
+    Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
+    List<CloseableIterable<StructLike>> deletes = Lists.transform(posDeletes,
+        delete -> openPosDeletes(delete, dataFile));
+
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {

Review comment:
       Yeah, we will add more features and config as we go. This is mainly just to demonstrate right now, so we can start updating the read paths in parallel.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472237333



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -78,6 +82,10 @@
     this.sortedDeletesByPartition = sortedDeletesByPartition;
   }
 
+  public boolean isEmpty() {
+    return (globalDeletes == null || globalDeletes.length == 0) && sortedDeletesByPartition.isEmpty();

Review comment:
       Should we have a Precondition that sortedDeletesByPartition is not null in the constructor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472220189



##########
File path: api/src/main/java/org/apache/iceberg/data/Record.java
##########
@@ -35,4 +36,26 @@
   Record copy();
 
   Record copy(Map<String, Object> overwriteValues);
+
+  default Record copy(String field, Object value) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);
+    overwriteValues.put(field, value);
+    return copy(overwriteValues);
+  }
+
+  default Record copy(String field1, Object value1, String field2, Object value2) {
+    Map<String, Object> overwriteValues = Maps.newHashMapWithExpectedSize(1);

Review comment:
       I looked into the code for this, it's actually interesting that 1 and 2 follow a relatively fast path while 3 will require doing some power of 2 estimation.
   
   https://github.com/google/guava/blob/064fac35d71231aba35062d1965983ecd36b6873/guava/src/com/google/common/collect/Maps.java#L324-L328




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472362167



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r474988805



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;

Review comment:
       Yeah, I think this should be fine for now. We can add a projection for Generics later.
   For Spark, I guess we can only return additional data if the we declare the additional fields in the schema. Is that correct? If so, we will have to go through the all file scan tasks to determine what the schema should be. Today we are able to derive schema based solely off of input information for the table scan. It can be potentially sub optimal too, since we may need to project the extra columns for all the file scans even if a single file scan requires the extra column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472261585



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {
+      return false;
     }
+
+    ByteBuffer upper = uppers.get(pathId);
+    if (upper != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static boolean canContainEqDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    if (dataFile.lowerBounds() == null || dataFile.upperBounds() == null ||
+        deleteFile.lowerBounds() == null || deleteFile.upperBounds() == null) {
+      return true;
+    }
+
+    Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+    Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
+
+    Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
+    Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
+    Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
+    Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
+
+    for (int id : deleteFile.equalityFieldIds()) {
+      Types.NestedField field = schema.findField(id);
+      if (!field.type().isPrimitiveType()) {
+        return true;
+      }
+
+      if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
+        return false;
+      }
+
+      if (allNull(deleteNullCounts, deleteValueCounts, field) && allNonNull(dataNullCounts, field)) {
+        return false;
+      }
+
+      ByteBuffer dataLower = dataLowers.get(id);
+      ByteBuffer dataUpper = dataUppers.get(id);
+      ByteBuffer deleteLower = deleteLowers.get(id);
+      ByteBuffer deleteUpper = deleteUppers.get(id);
+      if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
+        return true;
+      }
+
+      if (!rangesOverlap(field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static <T> boolean rangesOverlap(Type.PrimitiveType type,
+                                           ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf,
+                                           ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf) {
+    Comparator<T> comparator = Comparators.forType(type);
+    T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+    T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
+    T deleteLower = Conversions.fromByteBuffer(type, deleteLowerBuf);
+    T deleteUpper = Conversions.fromByteBuffer(type, deleteUpperBuf);
+
+    return comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;
+  }
+
+  private static boolean allNonNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) {
+    if (field.isRequired()) {
+      return true;
+    }
+
+    if (nullValueCounts == null) {
+      return false;
+    }
+
+    Long nullValueCount = nullValueCounts.get(field.fieldId());
+    if (nullValueCount == null) {
+      return false;
+    }
+
+    return nullValueCount <= 0;

Review comment:
       Ah I see we do a 0 check here as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r473195849



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.ProjectStructLike;
+import org.apache.iceberg.util.StructLikeSet;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(!posDeletes.isEmpty());
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(boolean hasPosDeletes) {

Review comment:
       Yes, very good point. We should probably have two methods, one for position deletes and one for equality.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472259511



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {
+      return false;
     }
+
+    ByteBuffer upper = uppers.get(pathId);
+    if (upper != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, upper)) > 0) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static boolean canContainEqDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    if (dataFile.lowerBounds() == null || dataFile.upperBounds() == null ||
+        deleteFile.lowerBounds() == null || deleteFile.upperBounds() == null) {
+      return true;
+    }
+
+    Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+    Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
+
+    Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
+    Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
+    Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
+    Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
+
+    for (int id : deleteFile.equalityFieldIds()) {
+      Types.NestedField field = schema.findField(id);
+      if (!field.type().isPrimitiveType()) {
+        return true;
+      }
+
+      if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
+        return false;
+      }
+
+      if (allNull(deleteNullCounts, deleteValueCounts, field) && allNonNull(dataNullCounts, field)) {
+        return false;
+      }
+
+      ByteBuffer dataLower = dataLowers.get(id);
+      ByteBuffer dataUpper = dataUppers.get(id);
+      ByteBuffer deleteLower = deleteLowers.get(id);
+      ByteBuffer deleteUpper = deleteUppers.get(id);
+      if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
+        return true;
+      }
+
+      if (!rangesOverlap(field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static <T> boolean rangesOverlap(Type.PrimitiveType type,
+                                           ByteBuffer dataLowerBuf, ByteBuffer dataUpperBuf,
+                                           ByteBuffer deleteLowerBuf, ByteBuffer deleteUpperBuf) {
+    Comparator<T> comparator = Comparators.forType(type);
+    T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+    T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
+    T deleteLower = Conversions.fromByteBuffer(type, deleteLowerBuf);
+    T deleteUpper = Conversions.fromByteBuffer(type, deleteUpperBuf);
+
+    return comparator.compare(deleteLower, dataUpper) <= 0 && comparator.compare(dataLower, deleteUpper) <= 0;
+  }
+
+  private static boolean allNonNull(Map<Integer, Long> nullValueCounts, Types.NestedField field) {
+    if (field.isRequired()) {
+      return true;
+    }
+
+    if (nullValueCounts == null) {
+      return false;
+    }
+
+    Long nullValueCount = nullValueCounts.get(field.fieldId());
+    if (nullValueCount == null) {

Review comment:
       Just checking but in this map we will never have a 0 right? Just making sure we aren't ignore nullValueCount.get(X) == 0




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#issuecomment-678714919


   @shardulm94, could you take another look? I think I have this in a good state to commit.
   
   There are a few things to fix in follow ups:
   * Add support for equality deletes by nested columns. This will require building a projection with additional fields in nested structs, not just top-level.
   * Add a wrapper to truncate generic rows
   * Add table configuration to control the number of position deletes held in memory before using a merge strategy


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r472356314



##########
File path: core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
##########
@@ -96,21 +104,157 @@ private StructLikeWrapper newWrapper(int specId) {
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
     Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);
 
+    Stream<DeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes);
     } else if (globalDeletes == null) {
-      return limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
+      matchingDeletes = limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second());
     } else {
-      return Stream.concat(
-          Stream.of(limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes)),
-          Stream.of(limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()))
-      ).toArray(DeleteFile[]::new);
+      matchingDeletes = Stream.concat(
+          limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
+          limitBySequenceNumber(sequenceNumber, partitionDeletes.first(), partitionDeletes.second()));
+    }
+
+    return matchingDeletes
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile, specsById.get(file.specId()).schema()))
+        .toArray(DeleteFile[]::new);
+  }
+
+  private static boolean canContainDeletesForFile(DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+    switch (deleteFile.content()) {
+      case POSITION_DELETES:
+        return canContainPosDeletesForFile(dataFile, deleteFile);
+
+      case EQUALITY_DELETES:
+        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+    }
+
+    return true;
+  }
+
+  private static boolean canContainPosDeletesForFile(DataFile dataFile, DeleteFile deleteFile) {
+    // check that the delete file can contain the data file's file_path
+    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
+    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
+    if (lowers == null || uppers == null) {
+      return true;
+    }
+
+    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+    int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    ByteBuffer lower = lowers.get(pathId);
+    if (lower != null &&
+        Comparators.charSequences().compare(dataFile.path(), Conversions.fromByteBuffer(pathType, lower)) < 0) {

Review comment:
       We don't actually convert to `String`. I think what gets returned by `Conversions` is actually a `CharBuffer`, which implements `CharSequence`. That avoids the copy like (I think) you're suggesting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475803745



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -114,14 +117,40 @@
     return records;
   }
 
-  private Schema fileProjection(boolean hasPosDeletes) {
-    if (hasPosDeletes) {
-      List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);

Review comment:
       Okay yeah, that is correct. We should just be building the projection differently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1352: Support row-level deletes with IcebergGenerics

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1352:
URL: https://github.com/apache/iceberg/pull/1352#discussion_r475754066



##########
File path: data/src/main/java/org/apache/iceberg/data/GenericReader.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.parquet.Preconditions;
+
+class GenericReader implements Serializable {
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final FileIO io;
+  private final Schema tableSchema;
+  private final Schema projection;
+  private final boolean caseSensitive;
+  private final boolean reuseContainers;
+
+  GenericReader(TableScan scan, boolean reuseContainers) {
+    this.io = scan.table().io();
+    this.tableSchema = scan.table().schema();
+    this.projection = scan.schema();
+    this.caseSensitive = scan.isCaseSensitive();
+    this.reuseContainers = reuseContainers;
+  }
+
+  CloseableIterator<Record> open(CloseableIterable<CombinedScanTask> tasks) {
+    Iterable<FileScanTask> fileTasks = Iterables.concat(Iterables.transform(tasks, CombinedScanTask::files));
+    return CloseableIterable.concat(Iterables.transform(fileTasks, this::open)).iterator();
+  }
+
+  public CloseableIterable<Record> open(CombinedScanTask task) {
+    return new CombinedTaskIterable(task);
+  }
+
+  public CloseableIterable<Record> open(FileScanTask task) {
+    List<DeleteFile> posDeletes = Lists.newArrayList();
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeletes.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeletes.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    Schema fileProjection = fileProjection(posDeletes, eqDeletes);
+
+    CloseableIterable<Record> records = openFile(task, fileProjection);
+    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
+    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
+    records = applyResidual(records, fileProjection, task.residual());
+
+    return records;
+  }
+
+  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
+
+    if (missingIds.isEmpty()) {
+      return projection;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
+
+      columns.add(field);
+    }
+
+    if (requiredIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
+      columns.add(MetadataColumns.ROW_POSITION);
+    }
+
+    return new Schema(columns);
+  }
+
+  private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
+                                                  Expression residual) {
+    if (residual != null && residual != Expressions.alwaysTrue()) {
+      InternalRecordWrapper wrapper = new InternalRecordWrapper(recordSchema.asStruct());
+      Evaluator filter = new Evaluator(recordSchema.asStruct(), residual, caseSensitive);
+      return CloseableIterable.filter(records, record -> filter.eval(wrapper.wrap(record)));
+    }
+
+    return records;
+  }
+
+  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<Record> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
+
+      // a wrapper to translate from generic objects to internal representations
+      InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(recordSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, dataFile, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      filteredRecords = Deletes.filter(filteredRecords,
+          record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
+    }
+
+    return filteredRecords;
+  }
+
+  private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
+                                                    CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
+    if (posDeletes.isEmpty()) {
+      return records;
+    }
+
+    Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+    Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
+    List<CloseableIterable<StructLike>> deletes = Lists.transform(posDeletes,
+        delete -> openPosDeletes(delete, dataFile));
+
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {
+      return Deletes.filter(records, posGetter, Deletes.toPositionSet(file, CloseableIterable.concat(deletes)));
+    }
+
+    return Deletes.streamingFilter(records, posGetter, Deletes.deletePositions(file, deletes));
+  }
+
+  private CloseableIterable<StructLike> openPosDeletes(DeleteFile file, DataFile dataFile) {
+    return openDeletes(file, dataFile, POS_DELETE_SCHEMA);
+  }
+
+  private <T> CloseableIterable<T> openDeletes(DeleteFile deleteFile, DataFile dataFile, Schema deleteSchema) {

Review comment:
       This should actually be `Record`. I'll fix it. Thanks for pointing this out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org