You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/05 04:03:27 UTC

[GitHub] [iceberg] chenjunjiedada opened a new pull request #2216: Spark: support replace equality deletes to position deletes

chenjunjiedada opened a new pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216


   This adds a spark action to replace the equality deletes to position deletes which I think is minor compaction. The logic is:
   
   1. Plan and group the tasks by partition. Current it doesn't consider the filter, we may consider filter, such as partition filter, later.
   2. Use the delete matcher to keep rows that match the equality delete set. The rows are projected with file and pos fields.
   3. Write the matched rows via position delete writer.
   4. Perform the rewrite files to replace equality deletes with position deletes.
   
   This adds an API in `RewriteFiles` to rewrite equality deletes to position deletes. It should keep the same semantic with current API that rows must be the same as before as after. This could be used to combine position deletes to reduce some small files.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r596528821



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +113,42 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    List<Predicate<T>> deleteSetFilters = Lists.newArrayList();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      Predicate<T> predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
+      deleteSetFilters.add(predicate);
+    }
+
+    Filter<T> findDeleteRows = new ChainOrFilter<>(deleteSetFilters);

Review comment:
       We've removed the `ChainOrFilter` in the committed PR https://github.com/apache/iceberg/pull/2320. The reviewed patch should have fixed your concern.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r596403960



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +113,42 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {

Review comment:
       Looks like this method is mostly the same as `applyEqDeletes` except for the predicate evaluation, do we want to abstract the common logic out? 

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +113,42 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    List<Predicate<T>> deleteSetFilters = Lists.newArrayList();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      Predicate<T> predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
+      deleteSetFilters.add(predicate);
+    }
+
+    Filter<T> findDeleteRows = new ChainOrFilter<>(deleteSetFilters);

Review comment:
       Do we need an extra class for this? This seems to be achievable via something like 
   ```
   return CloseableIterable.filter(records, record -> 
       deleteSetFilters.stream().anyMatch(filter -> filter.test(record)));
   ````

##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty() && !posDeletes.isEmpty()) {

Review comment:
       I think the comment "This kind of rewrite is valid actually because it replace all the useless equality files to empty position delete files" also applies here that we don't need to check for empty `posDeletes`? 

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class DeleteRowReader extends RowDataReader {
+  private final Schema tableSchema;
+  private final Schema expectedSchema;
+
+  public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+                         FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+    super(task, schema, schema, nameMapping, io, encryptionManager,
+        caseSensitive);
+    this.tableSchema = schema;
+    this.expectedSchema = expectedSchema;
+  }
+
+  @Override
+  CloseableIterator<InternalRow> open(FileScanTask task) {
+    SparkDeleteMatcher matches = new SparkDeleteMatcher(task, tableSchema, expectedSchema);
+
+    // schema or rows returned by readers
+    Schema requiredSchema = matches.requiredSchema();
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    DataFile file = task.file();
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+    return matches.matchEqDeletes(open(task, requiredSchema, idToConstant)).iterator();

Review comment:
       Looks like mostly only this line differs between this class and `RowDataReader` that I think we can abstract a lot of the code out

##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589924035



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,23 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),
+        "Files to add can not be null or empty");
+
+    for (DeleteFile toDelete : deletesToDelete) {
+      delete(toDelete);

Review comment:
       As we will implement the RewriteFiles with delete files, there're more delete/data files in `rewriteDeletes` to check. so I'm OK with the current assertion in `ReplaceDeleteAction`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589140285



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       > I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.
   
   That's incorrect,  to analysis the complexity, we only need to consider the key sentence: 
   
   ```java
   Deletes.match(remainRecords, record -> projectRow.wrap(asStructLike(record)), deleteSet)
   ```
   
   The final returned `matchedRecords` is composed by several  above `Iterable` (s).  When iterate this `Iterable`,  we will scan all the elements in `remainRecords`,   finally we will scan the original data set multiple times.   That's why I said the complexity is too high.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590053925



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       I understand your concern. The check is used to discard the invalid rewrite, we don't want to continue the rewrite if there is no position delete produced. Don't we?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589150323



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {

Review comment:
       I'd like to introduce a `ChainFilter` to simply the whole filter logics: 
   
   ```patch
   From 09c85b7625555f55061501e4ee0a474295792bba Mon Sep 17 00:00:00 2001
   From: huzheng <op...@gmail.com>
   Date: Mon, 8 Mar 2021 11:30:50 +0800
   Subject: [PATCH] filter
   
   ---
    .../org/apache/iceberg/deletes/Deletes.java   | 36 ----------------
    .../apache/iceberg/util/ChainOrFilter.java    | 42 +++++++++++++++++++
    .../org/apache/iceberg/data/DeleteFilter.java | 12 ++++--
    3 files changed, 50 insertions(+), 40 deletions(-)
    create mode 100644 core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java
   
   diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   index c2f21fe19..62154f7d6 100644
   --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   @@ -24,7 +24,6 @@ import java.io.UncheckedIOException;
    import java.util.Comparator;
    import java.util.List;
    import java.util.Set;
   -import java.util.function.BiFunction;
    import java.util.function.Function;
    import org.apache.iceberg.Accessor;
    import org.apache.iceberg.MetadataColumns;
   @@ -41,10 +40,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
    import org.apache.iceberg.types.Comparators;
    import org.apache.iceberg.types.Types;
    import org.apache.iceberg.util.Filter;
   -import org.apache.iceberg.util.Pair;
    import org.apache.iceberg.util.SortedMerge;
    import org.apache.iceberg.util.StructLikeSet;
   -import org.apache.iceberg.util.StructProjection;
    
    public class Deletes {
      private static final Schema POSITION_DELETE_SCHEMA = new Schema(
   @@ -80,17 +77,6 @@ public class Deletes {
        return filter.filter(rows);
      }
    
   -  public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
   -                                               BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
   -                                               List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
   -    if (unprojectedDeleteSets.isEmpty()) {
   -      return rows;
   -    }
   -
   -    EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
   -    return equalityFilter.filter(rows);
   -  }
   -
      public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
        try (CloseableIterable<StructLike> deletes = eqDeletes) {
          StructLikeSet deleteSet = StructLikeSet.create(eqType);
   @@ -157,28 +143,6 @@ public class Deletes {
        }
      }
    
   -  private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
   -    private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
   -    private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
   -
   -    protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
   -                                      List<Pair<StructProjection, StructLikeSet>> deleteSets) {
   -      this.extractEqStruct = extractEq;
   -      this.deleteSets = deleteSets;
   -    }
   -
   -    @Override
   -    protected boolean shouldKeep(T row) {
   -      for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
   -        if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
   -          return true;
   -        }
   -      }
   -
   -      return false;
   -    }
   -  }
   -
      private static class PositionSetDeleteFilter<T> extends Filter<T> {
        private final Function<T, Long> rowToPosition;
        private final Set<Long> deleteSet;
   diff --git a/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java b/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java
   new file mode 100644
   index 000000000..2fbc54179
   --- /dev/null
   +++ b/core/src/main/java/org/apache/iceberg/util/ChainOrFilter.java
   @@ -0,0 +1,42 @@
   +/*
   + * Licensed to the Apache Software Foundation (ASF) under one
   + * or more contributor license agreements.  See the NOTICE file
   + * distributed with this work for additional information
   + * regarding copyright ownership.  The ASF licenses this file
   + * to you under the Apache License, Version 2.0 (the
   + * "License"); you may not use this file except in compliance
   + * with the License.  You may obtain a copy of the License at
   + *
   + *   http://www.apache.org/licenses/LICENSE-2.0
   + *
   + * Unless required by applicable law or agreed to in writing,
   + * software distributed under the License is distributed on an
   + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   + * KIND, either express or implied.  See the License for the
   + * specific language governing permissions and limitations
   + * under the License.
   + */
   +
   +package org.apache.iceberg.util;
   +
   +import java.util.List;
   +import java.util.function.Predicate;
   +
   +public class ChainOrFilter<T> extends Filter<T> {
   +
   +  private final List<Predicate<T>> filters;
   +
   +  public ChainOrFilter(List<Predicate<T>> filters) {
   +    this.filters = filters;
   +  }
   +
   +  @Override
   +  protected boolean shouldKeep(T item) {
   +    for (Predicate<T> filter : filters) {
   +      if (filter.test(item)) {
   +        return true;
   +      }
   +    }
   +    return false;
   +  }
   +}
   diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   index 884641355..d7fdde3b8 100644
   --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   @@ -23,6 +23,7 @@ import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
   +import java.util.function.Predicate;
    import org.apache.iceberg.Accessor;
    import org.apache.iceberg.DataFile;
    import org.apache.iceberg.DeleteFile;
   @@ -48,7 +49,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
    import org.apache.iceberg.relocated.com.google.common.collect.Sets;
    import org.apache.iceberg.types.TypeUtil;
    import org.apache.iceberg.types.Types;
   -import org.apache.iceberg.util.Pair;
   +import org.apache.iceberg.util.ChainOrFilter;
   +import org.apache.iceberg.util.Filter;
    import org.apache.iceberg.util.StructLikeSet;
    import org.apache.iceberg.util.StructProjection;
    import org.apache.parquet.Preconditions;
   @@ -121,7 +123,7 @@ public abstract class DeleteFilter<T> {
          filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
        }
    
   -    List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets = Lists.newArrayList();
   +    List<Predicate<T>> deleteSetFilters = Lists.newArrayList();
        for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
          Set<Integer> ids = entry.getKey();
          Iterable<DeleteFile> deletes = entry.getValue();
   @@ -138,10 +140,12 @@ public abstract class DeleteFilter<T> {
              CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
              deleteSchema.asStruct());
    
   -      unprojectedDeleteSets.add(Pair.of(projectRow, deleteSet));
   +      Predicate<T> predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
   +      deleteSetFilters.add(predicate);
        }
    
   -    return Deletes.match(records, (record, projection) -> projection.wrap(asStructLike(record)), unprojectedDeleteSets);
   +    Filter<T> findDeleteRows = new ChainOrFilter<>(deleteSetFilters);
   +    return findDeleteRows.filter(records);
      }
    
      protected CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
   -- 
   2.20.1 (Apple Git-117)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#issuecomment-776349503


   Hi @rdblue @aokolnychyi @openinx, This is a draft for replacing deletes. Could you please help to take a look and check whether this is the right direction? I'd like to add another API in action to compact multiple position deletes to one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590937638



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       This kind of rewrite is valid actually because it replace all the useless equality files to empty position delete files. After the rewrite action,   the normal read path don't have to filter the useless equality deletes again, that will be a great performance improvement.  So we have to submit the RewriteFiles transaction here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590041519



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       This check is incorrect, because if all the equality deletes are not hit the data files, then there will be no position delete to produce.. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590041865



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       I will suggest to add an unit test for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r588083878



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);

Review comment:
       I'd like to move the RDD chaining out of the [DeleteRewriter](https://github.com/apache/iceberg/pull/2216/files#diff-8735e213fd3c1ca6eef5e7ac82ff5e8cb76addecede2e5cc1912a1dac9ac618fR87-R93) class , so that we could reuse that class for other compute engine's ReplaceDeleteAction.
   
   ```java
         List<DeleteFile> posDeletes = taskRDD.map(deleteRewriter::toPosDeletes)
             .collect()
             .stream()
             .flatMap(Collection::stream)
             .collect(Collectors.toList());
   ```

##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);

Review comment:
       OK, the `DeleteRewriter`  is still using few other spark's class such as SparkAppenderFactory.  We may need to abstract that part logics,  so that we could reuse the rewrite logics between different engines.

##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();

Review comment:
       Nit:  I think the `eqDeletes` should better be defined as `HashSet` because different  `FileScanTask` will share the same equality delete files (Though we've use the HashSet to deduplicate the same equality delete files in `RewriteFiles` , I still think it's better to do this before calling that API). 

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       Here I'm concerning it may not worth to take such a high complexity.   Let's define the whole data set as `S`,  for the first equality field ids `<1,2>`,  the deleteSet is `S1`,  the second equality field ids `<1,2>`, the deleteSet is `S2`,  the third equality delete field ids `<2,5>`, the deleteSet is `S3`.
   
   Finally the concat `matchedRecords` will be  
   
   `  Intersection(S, S1)  UNION  Intersection(( S - S1 ), S2) UNION  Intersection((S - S1- S2), S3)` 
   
   ( Here `S - S1` means it will return all elements which is in set `S` but not in set `S1` ) 
   
   though the current code will return the correct converted positional deletions , but it will iterate the big data set `S` three times ?   This overhead will be very large...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589044452



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       Let me post it first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r588078058



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?

Review comment:
       Filed an issue for this: https://github.com/apache/iceberg/issues/2298




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589209321



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,23 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),
+        "Files to add can not be null or empty");
+
+    for (DeleteFile toDelete : deletesToDelete) {
+      delete(toDelete);

Review comment:
       Make sense to me, but I'd like to keep logic here and assert in `ReplaceDeleteAction` since we may have an action to merge position delete files. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
coolderli commented on pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#issuecomment-917963353


   @chenjunjiedada any update?Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589042721



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.
   
   For a filter chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` N times
   - `F2` (N-N1) times
   - `F3` (N-N1-N2) times
   - `F4` (N-N1-N2-N3) times
   
   For a matching chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it matches out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` 2N times (filter and match)
   - `F2` 2(N-N1) times (filter and match)
   - `F3` 2(N-N1-N2) times (filter and match)
   - `F4` 2(N-N1-N2-N3) times (filter and match)
   
   @rdblue Could you please help to correct me if I am wrong?
   
   Here is an alternative implementation that collects all delete sets in a list and does the projection in the filter. It doesn't depend on temporary iterables and looks a bit straightforward. I could change to this one if you like it.
   
   ```java
     public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
                                                  BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
                                                  List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
       if (unprojectedDeleteSets.isEmpty()) {
         return rows;
       }
   
       EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
       return equalityFilter.filter(rows);
     }
   
   
     private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
       private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
       private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
   
       protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
                                         List<Pair<StructProjection, StructLikeSet>> deleteSets) {
         this.extractEqStruct = extractEq;
         this.deleteSets = deleteSets;
       }
   
       @Override
       protected boolean shouldKeep(T row) {
         for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
           if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
             return true;
           }
         }
   
         return false;
       }
     }
   
   ```
   
   PS: For the delete files with the same equality field IDs we will collect the deletes in one set. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#issuecomment-803247112


   @yyanyy Thanks a lot for your review! I Will update ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590939537



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       You could see the validation in the extended RewriteFiles API here ( https://github.com/apache/iceberg/pull/2294/files#diff-b92a78b7fb207d4979d503a442189d9d096e4d19519a4b83eed9e1e779843810R68)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r596529245



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty() && !posDeletes.isEmpty()) {

Review comment:
       Yeah, you're correct !

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRowReader.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class DeleteRowReader extends RowDataReader {
+  private final Schema tableSchema;
+  private final Schema expectedSchema;
+
+  public DeleteRowReader(CombinedScanTask task, Schema schema, Schema expectedSchema, String nameMapping,
+                         FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
+    super(task, schema, schema, nameMapping, io, encryptionManager,
+        caseSensitive);
+    this.tableSchema = schema;
+    this.expectedSchema = expectedSchema;
+  }
+
+  @Override
+  CloseableIterator<InternalRow> open(FileScanTask task) {
+    SparkDeleteMatcher matches = new SparkDeleteMatcher(task, tableSchema, expectedSchema);
+
+    // schema or rows returned by readers
+    Schema requiredSchema = matches.requiredSchema();
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    DataFile file = task.file();
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+    return matches.matchEqDeletes(open(task, requiredSchema, idToConstant)).iterator();

Review comment:
       The newly introduced `EqualityDeleteReader` should have fixed your comment: https://github.com/apache/iceberg/pull/2320/files#diff-6dc9ab9ec3abcb1972bc39e5c0f0fa95b00a822c0b8996b3c94d2dc702381fe4R34




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r592142964



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,22 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),

Review comment:
       Make sense to me! I will update then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada closed pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada closed pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589208012



##########
File path: api/src/main/java/org/apache/iceberg/RewriteFiles.java
##########
@@ -42,4 +42,13 @@
    * @return this for method chaining
    */
   RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd);
+
+  /**
+   * Add a rewrite that replaces one set of deletes with another that contains the same deleted rows.
+   *
+   * @param deletesToDelete files that will be replaced, cannot be null or empty.
+   * @param deletesToAdd files that will be added, cannot be null or empty.
+   * @return this for method chaining
+   */
+  RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd);

Review comment:
       That makes sense to me.  I think we could parallelize the API refactoring and the implementation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589970470



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty() && !posDeletes.isEmpty()) {
+        rewriteDeletes(Lists.newArrayList(eqDeletes), posDeletes);
+        return new DeleteRewriteActionResult(Lists.newArrayList(eqDeletes), posDeletes);
+      }
+    }
+
+    return new DeleteRewriteActionResult(Collections.emptyList(), Collections.emptyList());
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+    try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+      iterator.forEachRemaining(task -> {
+        StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
+        if (TableScanUtil.hasDeletes(task)) {

Review comment:
       Here the `task` must have at least one delete files because  the `Collection<FileScanTask>` has been got by filtering the EQUALITY_DELETES delete files.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.SortedPosDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class DeleteRewriter implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteRewriter.class);
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final LocationProvider locations;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+
+  public DeleteRewriter(Table table, boolean caseSensitive,
+                        Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.spec = table.spec();
+    this.schema = table.schema();
+    this.locations = table.locationProvider();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.properties = table.properties();
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    String formatString = table.properties().getOrDefault(
+            TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  public List<DeleteFile> toPosDeletes(JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD) {
+    JavaRDD<List<DeleteFile>> dataFilesRDD = taskRDD.map(this::toPosDeletes);
+
+    return dataFilesRDD.collect().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  public List<DeleteFile> toPosDeletes(Pair<StructLike, CombinedScanTask> task) throws Exception {
+    TaskContext context = TaskContext.get();
+    int partitionId = context.partitionId();
+    long taskId = context.taskAttemptId();
+
+    Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION);
+    Schema expectedSchema = TypeUtil.join(metaSchema, schema);
+
+    DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), schema, expectedSchema, nameMapping,
+        io.value(), encryptionManager.value(), caseSensitive);
+
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+
+    OutputFileFactory fileFactory = new OutputFileFactory(
+        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+    PartitionKey key = new PartitionKey(spec, schema);
+    key.partition(task.first());
+    SortedPosDeleteWriter<InternalRow> posDeleteWriter =
+        new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, key);
+
+    try {
+      while (deleteRowReader.next()) {
+        InternalRow row = deleteRowReader.get();
+        posDeleteWriter.delete(row.getString(0), row.getLong(1));
+      }
+
+      deleteRowReader.close();
+      deleteRowReader = null;
+
+      posDeleteWriter.close();

Review comment:
       Don't have to close the `posDeleteWriter` here because the following `complete()` will close it inside automatically.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.SortedPosDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class DeleteRewriter implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteRewriter.class);
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final LocationProvider locations;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+
+  public DeleteRewriter(Table table, boolean caseSensitive,
+                        Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.spec = table.spec();
+    this.schema = table.schema();
+    this.locations = table.locationProvider();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.properties = table.properties();
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    String formatString = table.properties().getOrDefault(
+            TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  public List<DeleteFile> toPosDeletes(JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD) {
+    JavaRDD<List<DeleteFile>> dataFilesRDD = taskRDD.map(this::toPosDeletes);
+
+    return dataFilesRDD.collect().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  public List<DeleteFile> toPosDeletes(Pair<StructLike, CombinedScanTask> task) throws Exception {
+    TaskContext context = TaskContext.get();
+    int partitionId = context.partitionId();
+    long taskId = context.taskAttemptId();
+
+    Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION);
+    Schema expectedSchema = TypeUtil.join(metaSchema, schema);
+
+    DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), schema, expectedSchema, nameMapping,
+        io.value(), encryptionManager.value(), caseSensitive);
+
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+
+    OutputFileFactory fileFactory = new OutputFileFactory(
+        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+    PartitionKey key = new PartitionKey(spec, schema);
+    key.partition(task.first());

Review comment:
       Could we just pass the `PartitionKey` when `groupTasksByPartition` in `ReplaceDeleteAction` ? then we don't have to partition it again here , actually it's really partition value for the current task.first().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r588067080



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ReplaceDeleteAction.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.DeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplaceDeleteAction extends
+    BaseSnapshotUpdateAction<ReplaceDeleteAction, DeleteRewriteActionResult> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplaceDeleteAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+
+  public ReplaceDeleteAction(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table());
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public DeleteRewriteActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    List<DeleteFile> eqDeletes = Lists.newArrayList();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    // TODO: can we split task?

Review comment:
       Yes,  we can split the task based on `DataFile`(s) here.  But that introduce another issue here,  the current balance policy ( for splitting tasks ) only consider the `DataFile` ,  the idea way should be considering both insert file size and delete file size.  I think there should be another separate issues to address this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589150323



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {

Review comment:
       I'd like to introduce a `ChainFilter` to simply the whole filter logics: 
   
   ```patch
   From ae394375024e11cd90e2cde280bbbf9ea9999c46 Mon Sep 17 00:00:00 2001
   From: huzheng <op...@gmail.com>
   Date: Mon, 8 Mar 2021 11:30:50 +0800
   Subject: [PATCH] filter
   
   ---
    .../org/apache/iceberg/deletes/Deletes.java   | 36 ----------------
    .../org/apache/iceberg/util/ChainFilter.java  | 42 +++++++++++++++++++
    .../org/apache/iceberg/data/DeleteFilter.java | 12 ++++--
    3 files changed, 50 insertions(+), 40 deletions(-)
    create mode 100644 core/src/main/java/org/apache/iceberg/util/ChainFilter.java
   
   diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   index c2f21fe19..62154f7d6 100644
   --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
   @@ -24,7 +24,6 @@ import java.io.UncheckedIOException;
    import java.util.Comparator;
    import java.util.List;
    import java.util.Set;
   -import java.util.function.BiFunction;
    import java.util.function.Function;
    import org.apache.iceberg.Accessor;
    import org.apache.iceberg.MetadataColumns;
   @@ -41,10 +40,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
    import org.apache.iceberg.types.Comparators;
    import org.apache.iceberg.types.Types;
    import org.apache.iceberg.util.Filter;
   -import org.apache.iceberg.util.Pair;
    import org.apache.iceberg.util.SortedMerge;
    import org.apache.iceberg.util.StructLikeSet;
   -import org.apache.iceberg.util.StructProjection;
    
    public class Deletes {
      private static final Schema POSITION_DELETE_SCHEMA = new Schema(
   @@ -80,17 +77,6 @@ public class Deletes {
        return filter.filter(rows);
      }
    
   -  public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
   -                                               BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
   -                                               List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
   -    if (unprojectedDeleteSets.isEmpty()) {
   -      return rows;
   -    }
   -
   -    EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
   -    return equalityFilter.filter(rows);
   -  }
   -
      public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> eqDeletes, Types.StructType eqType) {
        try (CloseableIterable<StructLike> deletes = eqDeletes) {
          StructLikeSet deleteSet = StructLikeSet.create(eqType);
   @@ -157,28 +143,6 @@ public class Deletes {
        }
      }
    
   -  private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
   -    private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
   -    private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
   -
   -    protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
   -                                      List<Pair<StructProjection, StructLikeSet>> deleteSets) {
   -      this.extractEqStruct = extractEq;
   -      this.deleteSets = deleteSets;
   -    }
   -
   -    @Override
   -    protected boolean shouldKeep(T row) {
   -      for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
   -        if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
   -          return true;
   -        }
   -      }
   -
   -      return false;
   -    }
   -  }
   -
      private static class PositionSetDeleteFilter<T> extends Filter<T> {
        private final Function<T, Long> rowToPosition;
        private final Set<Long> deleteSet;
   diff --git a/core/src/main/java/org/apache/iceberg/util/ChainFilter.java b/core/src/main/java/org/apache/iceberg/util/ChainFilter.java
   new file mode 100644
   index 000000000..354b0dc38
   --- /dev/null
   +++ b/core/src/main/java/org/apache/iceberg/util/ChainFilter.java
   @@ -0,0 +1,42 @@
   +/*
   + * Licensed to the Apache Software Foundation (ASF) under one
   + * or more contributor license agreements.  See the NOTICE file
   + * distributed with this work for additional information
   + * regarding copyright ownership.  The ASF licenses this file
   + * to you under the Apache License, Version 2.0 (the
   + * "License"); you may not use this file except in compliance
   + * with the License.  You may obtain a copy of the License at
   + *
   + *   http://www.apache.org/licenses/LICENSE-2.0
   + *
   + * Unless required by applicable law or agreed to in writing,
   + * software distributed under the License is distributed on an
   + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   + * KIND, either express or implied.  See the License for the
   + * specific language governing permissions and limitations
   + * under the License.
   + */
   +
   +package org.apache.iceberg.util;
   +
   +import java.util.List;
   +import java.util.function.Predicate;
   +
   +public class ChainFilter<T> extends Filter<T> {
   +
   +  private final List<Predicate<T>> filters;
   +
   +  public ChainFilter(List<Predicate<T>> filters) {
   +    this.filters = filters;
   +  }
   +
   +  @Override
   +  protected boolean shouldKeep(T item) {
   +    for (Predicate<T> filter : filters) {
   +      if (!filter.test(item)) {
   +        return false;
   +      }
   +    }
   +    return false;
   +  }
   +}
   diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   index 884641355..6d4986ebf 100644
   --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
   @@ -23,6 +23,7 @@ import java.util.Collection;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
   +import java.util.function.Predicate;
    import org.apache.iceberg.Accessor;
    import org.apache.iceberg.DataFile;
    import org.apache.iceberg.DeleteFile;
   @@ -48,7 +49,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
    import org.apache.iceberg.relocated.com.google.common.collect.Sets;
    import org.apache.iceberg.types.TypeUtil;
    import org.apache.iceberg.types.Types;
   -import org.apache.iceberg.util.Pair;
   +import org.apache.iceberg.util.ChainFilter;
   +import org.apache.iceberg.util.Filter;
    import org.apache.iceberg.util.StructLikeSet;
    import org.apache.iceberg.util.StructProjection;
    import org.apache.parquet.Preconditions;
   @@ -121,7 +123,7 @@ public abstract class DeleteFilter<T> {
          filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
        }
    
   -    List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets = Lists.newArrayList();
   +    List<Predicate<T>> deleteSetFilters = Lists.newArrayList();
        for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
          Set<Integer> ids = entry.getKey();
          Iterable<DeleteFile> deletes = entry.getValue();
   @@ -138,10 +140,12 @@ public abstract class DeleteFilter<T> {
              CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
              deleteSchema.asStruct());
    
   -      unprojectedDeleteSets.add(Pair.of(projectRow, deleteSet));
   +      Predicate<T> predicate = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
   +      deleteSetFilters.add(predicate);
        }
    
   -    return Deletes.match(records, (record, projection) -> projection.wrap(asStructLike(record)), unprojectedDeleteSets);
   +    Filter<T> findDeleteRows = new ChainFilter<>(deleteSetFilters);
   +    return findDeleteRows.filter(records);
      }
    
      protected CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
   -- 
   2.20.1 (Apple Git-117)
   ```

##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,23 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),
+        "Files to add can not be null or empty");
+
+    for (DeleteFile toDelete : deletesToDelete) {
+      delete(toDelete);

Review comment:
       Nit:  for the replacing equality delete with position deletes,   we'd better to assert the delete files to be equality delete files ?  the delete file to add MUST be position delete files ?

##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -77,6 +80,17 @@ private Deletes() {
     return filter.filter(rows);
   }
 
+  public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,

Review comment:
       `match` is not a good candidate for me to express the meaning of finding the existing row data that hits the equality delete sets. I may need a better name for this.

##########
File path: api/src/main/java/org/apache/iceberg/RewriteFiles.java
##########
@@ -42,4 +42,13 @@
    * @return this for method chaining
    */
   RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd);
+
+  /**
+   * Add a rewrite that replaces one set of deletes with another that contains the same deleted rows.
+   *
+   * @param deletesToDelete files that will be replaced, cannot be null or empty.
+   * @param deletesToAdd files that will be added, cannot be null or empty.
+   * @return this for method chaining
+   */
+  RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd);

Review comment:
       Before we start the replacing equality deletes with position deletes,  I think we need to refactor the RewriteFiles API   to adjust more cases: 
   1.   Rewrite data files and remove all the delete rows.  The files to delete will be a set of data files and a set of delete files, and the files to add will be a set of data files.
   2. Replace equality deletes with position deletes,  the files to delete will be a set of equality delete files (we will need to ensure that all delete files are equality delete files ? ) ,  the files to add will be a set of position delete files.
   3.  Merging small delete files into a bigger delete files.  The files to delete will be  a set of equality/position delete files,  the files to add will be a set of equality/position delete files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589976030



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.SortedPosDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class DeleteRewriter implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteRewriter.class);
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final LocationProvider locations;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+
+  public DeleteRewriter(Table table, boolean caseSensitive,
+                        Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.spec = table.spec();
+    this.schema = table.schema();
+    this.locations = table.locationProvider();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.properties = table.properties();
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    String formatString = table.properties().getOrDefault(
+            TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  public List<DeleteFile> toPosDeletes(JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD) {
+    JavaRDD<List<DeleteFile>> dataFilesRDD = taskRDD.map(this::toPosDeletes);
+
+    return dataFilesRDD.collect().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  public List<DeleteFile> toPosDeletes(Pair<StructLike, CombinedScanTask> task) throws Exception {
+    TaskContext context = TaskContext.get();
+    int partitionId = context.partitionId();
+    long taskId = context.taskAttemptId();
+
+    Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION);
+    Schema expectedSchema = TypeUtil.join(metaSchema, schema);
+
+    DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), schema, expectedSchema, nameMapping,
+        io.value(), encryptionManager.value(), caseSensitive);
+
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+
+    OutputFileFactory fileFactory = new OutputFileFactory(
+        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+    PartitionKey key = new PartitionKey(spec, schema);
+    key.partition(task.first());

Review comment:
       Could we just pass the `PartitionKey` when `groupTasksByPartition` in `ReplaceDeleteAction` ? then we don't have to partition it again here , actually it's already partition value for the current task.first().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#issuecomment-776377189


   I'll try to make time this week. Thanks for working on this, @chenjunjiedada!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r573362635



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {

Review comment:
       I'm not sure how this can be integrated to the delete filter, Maybe `filterNot` or an API that returns both filtered and retain iterator. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#issuecomment-791312675


   Thanks @openinx for reviewing! I will update ASAP.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r589042721



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +110,44 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> remainRecords = records;
+    CloseableIterable<T> matchedRecords = CloseableIterable.empty();
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords,

Review comment:
       I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.  For a filter chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` N times
   - `F2` (N-N1) times
   - `F3` (N-N1-N2) times
   - `F4` (N-N1-N2-N3) times
   
   For a matching chain of a data set of `N` elements with filters (`F1, F2, F3, F4`), suppose it will filter out (`N1, N2, N3, N4`) items, I think it iterates data set one time and the number of filter calls should be:
   -  `F1` 2N times (filter and match)
   - `F2` 2(N-N1) times (filter and match)
   - `F3` 2(N-N1-N2) times (filter and match)
   - `F4` 2(N-N1-N2-N3) times (filter and match)
   
   @rdblue Could you please help to correct me if I am wrong?
   
   Here is an alternative implementation that collects all delete sets in a list and does the projection in the filter. It looks a bit straightforward. 
   
   ```java
     public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
                                                  BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
                                                  List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
       if (unprojectedDeleteSets.isEmpty()) {
         return rows;
       }
   
       EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
       return equalityFilter.filter(rows);
     }
   
   
     private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
       private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
       private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
   
       protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
                                         List<Pair<StructProjection, StructLikeSet>> deleteSets) {
         this.extractEqStruct = extractEq;
         this.deleteSets = deleteSets;
       }
   
       @Override
       protected boolean shouldKeep(T row) {
         for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
           if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
             return true;
           }
         }
   
         return false;
       }
     }
   
   ```
   
   PS: For the delete files with the same equality field IDs we will collect the deletes in one set. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r582750042



##########
File path: core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
##########
@@ -57,4 +57,23 @@ public RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> file
 
     return this;
   }
+
+  @Override
+  public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) {
+    Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(),
+        "Files to delete cannot be null or empty");
+    Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(),
+        "Files to add can not be null or empty");
+
+    for (DeleteFile toDelete : deletesToDelete) {
+      delete(toDelete);
+    }
+
+    for (DeleteFile toAdd : deletesToAdd) {
+      add(toAdd);
+    }
+
+    return this;
+

Review comment:
       extra empty line ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r596523793



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -110,7 +113,42 @@ protected long pos(T record) {
     return applyEqDeletes(applyPosDeletes(records));
   }
 
-  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+  public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) {

Review comment:
       Yes, in this separate PR, we've abstracted them into a single method.  https://github.com/apache/iceberg/pull/2320/files#diff-a6641d31cdfd66835b3447bef04be87786849126b07761e47b852837f67a988aR151




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2216: Spark: support replace equality deletes to position deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2216:
URL: https://github.com/apache/iceberg/pull/2216#discussion_r590294774



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/DeleteRewriter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.SortedPosDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class DeleteRewriter implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(DeleteRewriter.class);
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final LocationProvider locations;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+
+  public DeleteRewriter(Table table, boolean caseSensitive,
+                        Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.spec = table.spec();
+    this.schema = table.schema();
+    this.locations = table.locationProvider();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.properties = table.properties();
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    String formatString = table.properties().getOrDefault(
+            TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  public List<DeleteFile> toPosDeletes(JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD) {
+    JavaRDD<List<DeleteFile>> dataFilesRDD = taskRDD.map(this::toPosDeletes);
+
+    return dataFilesRDD.collect().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  public List<DeleteFile> toPosDeletes(Pair<StructLike, CombinedScanTask> task) throws Exception {
+    TaskContext context = TaskContext.get();
+    int partitionId = context.partitionId();
+    long taskId = context.taskAttemptId();
+
+    Schema metaSchema = new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION);
+    Schema expectedSchema = TypeUtil.join(metaSchema, schema);
+
+    DeleteRowReader deleteRowReader = new DeleteRowReader(task.second(), schema, expectedSchema, nameMapping,
+        io.value(), encryptionManager.value(), caseSensitive);
+
+    StructType structType = SparkSchemaUtil.convert(schema);
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType, spec);
+
+    OutputFileFactory fileFactory = new OutputFileFactory(
+        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+    PartitionKey key = new PartitionKey(spec, schema);
+    key.partition(task.first());

Review comment:
       You are right! The original logic here has a problem, it uses the partition value for `PartitionKey` which should expect a data row. I updated the writer constructor to accept `StructLike` instead of `PartitionKey` to fix this. Let me update unit tests as well to cover this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org