You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/24 03:06:40 UTC

[GitHub] [iceberg] chenjunjiedada opened a new pull request #2364: Spark: Add an action to rewrite equality deletes

chenjunjiedada opened a new pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364


   This is a sub-PR of #2216, it adds a spark action to replace the equality deletes to position deletes which I think is minor compaction. The logic is:
   
   1. Plan and group the tasks by partition. Current it doesn't consider the filter, we may consider filter, such as partition filter, later.
   2. Use the delete matcher to keep rows that match the equality delete set. The rows are projected with file and pos fields.
   3. Write the matched rows via position delete writer.
   4. Perform the rewrite files to replace equality deletes with position deletes.
   
   This adds an API in RewriteFiles to rewrite equality deletes to position deletes. It should keep the same semantic with the current API that rows must be the same as before as after. This could be used to combine position deletes to reduce some small files.
   
   This may need some changes when https://github.com/apache/iceberg/pull/2294 get merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678124432



##########
File path: core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
##########
@@ -64,4 +77,21 @@ public static boolean hasDeletes(FileScanTask task) {
             splitFiles),
         BaseCombinedScanTask::new);
   }
+
+  public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      PartitionSpec spec,

Review comment:
       I don't think it's correct to use the table latest partition spec to group the `FileScanTask`,  because different `FileScanTask`  many have different partition specs,  the correct way is to use the `FileScanTask#spec` to group the tasks.   We should remove the `spec` as an argument, otherwise it's introducing a bug...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677962211



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class);
+
+  private final Table table;
+  private long deleteTargetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private CloseableIterable<FileScanTask> tasksWithEqDelete;
+  private Iterable<DeleteFile> deletesToReplace;
+  private final JavaSparkContext sparkContext;
+
+  /**
+   * Defines whether to split out the result position deletes by data file names.
+   *
+   * This should be used in EqualityDeleteRewriter.
+   */
+  public static final String SPLIT_POSITION_DELETE = "split-position-delete";
+
+  public ConvertEqDeletesStrategy(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  @Override
+  public String name() {
+    return "CONVERT-EQUALITY-DELETES";
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Iterable<DeleteFile> selectDeletes() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    deletesToReplace = eqDeletes;
+
+    return deletesToReplace;
+  }
+
+  @Override
+  public Iterable<DeleteFile> rewriteDeletes() {
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks =
+        TableScanUtil.groupTasksByPartition(table.spec(), tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()

Review comment:
       After reading this, I think we can make the `RewriteDeleteStrategy` interface closer to `RewriteStrategy` interface. What we have here is basically the equivalent of `planFileGroups` plus `rewriteFiles` in `RewriteStrategy`. So I would propose we have the following methods in `RewriteDeleteStrategy` to be more aligned:
   
   ```
   Iterable<DeleteFile> selectDeletesToRewrite(Iterable<FileScanTask> dataFiles);
   
   Iterable<List<FileScanTask>> planDeleteGroups(Iterable<DeleteFile> deleteFiles);
   
   Set<DeleteFile> rewriteDeletes(List<DeleteFile> deleteFilesToRewrite);
   ```
   
   And we can get the partition `StructLike` directly from the list of scan tasks instead of passing it through the task pair in `EqualityDeleteRewriter`. In this way, we can also enable partial progress for commits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678144409



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,

Review comment:
       We will always load the pos-deletes into in-memory HashSet even if the row count of positional files exceed the given threshold,  because in this [buildPosDeletePredicate](https://github.com/apache/iceberg/pull/2364/files#diff-a6641d31cdfd66835b3447bef04be87786849126b07761e47b852837f67a988aR183), we've loaded all the file-offset into memory,  I think that's not the expected behavior..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678134390



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :

Review comment:
       Initializing the `isDeleted` as a predicate like`t->false`  will simplify this if-else as: 
   
   ```java
   isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r602660768



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSparkAction;
+import org.apache.iceberg.actions.RewriteDeleteActionResult;
+import org.apache.iceberg.actions.RewriteDeletes;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRewriteDeletesSparkAction extends BaseSparkAction<RewriteDeletes, RewriteDeletes.Result>
+    implements RewriteDeletes {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+  private boolean rewriteEqualityDelete;
+
+  public BaseRewriteDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table);
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  public Result execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    if (!rewriteEqualityDelete) {
+      LOG.warn("Only supports rewrite equality deletes currently");
+      return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      EqualityDeleteRewriter deleteRewriter = new EqualityDeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty()) {
+        rewriteEqualityDeletes(Lists.newArrayList(eqDeletes), posDeletes);
+        return new RewriteDeleteActionResult(Lists.newArrayList(eqDeletes), posDeletes);
+      }
+    }
+
+    return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+  }
+
+  @Override
+  public RewriteDeletes rewriteEqualityDeletes() {
+    this.rewriteEqualityDelete = true;
+    return this;
+  }
+
+  @Override
+  protected RewriteDeletes self() {
+    return null;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(

Review comment:
       I think you mean BaseRewriteDataFilesSparkAction, right? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677951030



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class);
+
+  private final Table table;
+  private long deleteTargetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private CloseableIterable<FileScanTask> tasksWithEqDelete;
+  private Iterable<DeleteFile> deletesToReplace;
+  private final JavaSparkContext sparkContext;
+
+  /**
+   * Defines whether to split out the result position deletes by data file names.
+   *
+   * This should be used in EqualityDeleteRewriter.
+   */
+  public static final String SPLIT_POSITION_DELETE = "split-position-delete";
+
+  public ConvertEqDeletesStrategy(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  @Override
+  public String name() {
+    return "CONVERT-EQUALITY-DELETES";
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Iterable<DeleteFile> selectDeletes() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();

Review comment:
       And it seems a bit redundant that we are iterating through tasks 2 times at L119 and here, there should be a way to simplify the whole logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r679865372



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
+          Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
+          if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) {
+            deleteMarker().accept(record);
+          }
+          return record;
+        });
+    }
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  private CloseableIterable<T> selectRowsFromDeletes(CloseableIterable<T> records, Predicate<T> isDeleted) {
+    CloseableIterable<T> markedRecords = CloseableIterable.transform(records, record -> {
+      if (isDeleted.test(record)) {
+        deleteMarker().accept(record);
+      }
+      return record;
+    });
+
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> isDeleted = buildEqDeletePredicate();
+    if (isDeleted == null) {
+      return CloseableIterable.empty();
+    }
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+    return selectRowsFromDeletes(records, isDeleted);
+  }
+
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      // Predicate to test whether a row has been deleted by equality deletions.
+      Predicate<T> isDeleted = buildPosDeletePredicate();
+      if (isDeleted == null) {
+        return CloseableIterable.empty();
       }
-    };
-    return deletedRowsFilter.filter(records);
+      return selectRowsFromDeletes(records, isDeleted);
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      CloseableIterable<T> markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos,
+              Deletes.deletePositions(dataFile.path(), deletes), deleteMarker());
+
+      return deletedRowsSelector().filter(markedRecords);
+    }
   }
 
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row should be visible to user after applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
+    Predicate<T> isDeleted = buildEqDeletePredicate();

Review comment:
       Thanks @openinx for the detailed reviewing and the findings.  I addressed the related changes in the separated PR #2372.  #2372 is an independant one for the delete row reader. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678154275



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,29 @@ public Schema requiredSchema() {
     return requiredSchema;
   }
 
+  protected int deleteMarkerIndex() {
+    if (deleteMarkerIndex != null) {
+      return deleteMarkerIndex;
+    }
+
+    int index = 0;
+    for (Types.NestedField field : requiredSchema().columns()) {
+      if (field.fieldId() != MetadataColumns.IS_DELETED.fieldId()) {
+        index = index + 1;
+      } else {
+        break;
+      }
+    }
+
+    deleteMarkerIndex = index;
+
+    return deleteMarkerIndex;
+  }
+
+  protected abstract Consumer<T> deleteMarker();

Review comment:
       How about introducing a new interface named `Setter` to set the `is_deleted` flag (which is similar to the org.apache.iceberg.Accessor)  so that we could have a good abstraction to hide the delete marker logic: 
   
   ```java
     interface Setter<T> extends Serializable {
       T set(T reuse);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#issuecomment-809037022


   @chenjunjiedada  Thanks for updating this patch,  I've got the https://github.com/apache/iceberg/pull/2294 merged, that patch extends RewriteFiles API to rewrite both insert data files and delete files in iceberg.  I think we could rebase this patch based on the latest commit.  I will take a look at the patch again once you've rebased this. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada closed pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada closed pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r680624287



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :

Review comment:
       I found I wrote in this way first but change to the current way because of the comment from Ryan, and that sounds reasonable to me.   FYI https://github.com/apache/iceberg/pull/2372/commits/bfd0aebafa2065861a8a4952ab1ac3652dc3dc1d#r603700530.
   
   Just reverted back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677957715



##########
File path: core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
##########
@@ -64,4 +77,21 @@ public static boolean hasDeletes(FileScanTask task) {
             splitFiles),
         BaseCombinedScanTask::new);
   }
+
+  public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(

Review comment:
       maybe I missed some other place, I only see it used in the strategy class, why is it not a private method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r679859689



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678232993



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
+          Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
+          if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) {
+            deleteMarker().accept(record);
+          }
+          return record;
+        });
+    }
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  private CloseableIterable<T> selectRowsFromDeletes(CloseableIterable<T> records, Predicate<T> isDeleted) {
+    CloseableIterable<T> markedRecords = CloseableIterable.transform(records, record -> {
+      if (isDeleted.test(record)) {
+        deleteMarker().accept(record);
+      }
+      return record;
+    });
+
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> isDeleted = buildEqDeletePredicate();
+    if (isDeleted == null) {
+      return CloseableIterable.empty();
+    }
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+    return selectRowsFromDeletes(records, isDeleted);
+  }
+
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      // Predicate to test whether a row has been deleted by equality deletions.
+      Predicate<T> isDeleted = buildPosDeletePredicate();
+      if (isDeleted == null) {
+        return CloseableIterable.empty();
       }
-    };
-    return deletedRowsFilter.filter(records);
+      return selectRowsFromDeletes(records, isDeleted);
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      CloseableIterable<T> markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos,
+              Deletes.deletePositions(dataFile.path(), deletes), deleteMarker());
+
+      return deletedRowsSelector().filter(markedRecords);
+    }
   }
 
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row should be visible to user after applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
+    Predicate<T> isDeleted = buildEqDeletePredicate();

Review comment:
       The related PR is : https://github.com/apache/iceberg/pull/2372




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r602659003



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSparkAction;
+import org.apache.iceberg.actions.RewriteDeleteActionResult;
+import org.apache.iceberg.actions.RewriteDeletes;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRewriteDeletesSparkAction extends BaseSparkAction<RewriteDeletes, RewriteDeletes.Result>
+    implements RewriteDeletes {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+  private boolean rewriteEqualityDelete;
+
+  public BaseRewriteDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table);
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  public Result execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    if (!rewriteEqualityDelete) {
+      LOG.warn("Only supports rewrite equality deletes currently");
+      return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->

Review comment:
       This is closeable so I think we don't have to. There is an empty check after grouping tasks. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#issuecomment-805452828


   @rdblue @openinx @yyanyy , This is part of the equality delete rewrite. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r679863560



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -96,6 +98,29 @@ public Schema requiredSchema() {
     return requiredSchema;
   }
 
+  protected int deleteMarkerIndex() {
+    if (deleteMarkerIndex != null) {
+      return deleteMarkerIndex;
+    }
+
+    int index = 0;
+    for (Types.NestedField field : requiredSchema().columns()) {
+      if (field.fieldId() != MetadataColumns.IS_DELETED.fieldId()) {
+        index = index + 1;
+      } else {
+        break;
+      }
+    }
+
+    deleteMarkerIndex = index;
+
+    return deleteMarkerIndex;
+  }
+
+  protected abstract Consumer<T> deleteMarker();

Review comment:
       I can try to make this better abstraction in the following PR, this PR contains too many changes now.  I think we will have some following-up minor changes and optimizations. Does that sound ok to you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#issuecomment-888306217


   Thanks @openinx and @jackye1995 for the detailed reviews. Let me update related PRs and will ping you guys soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r600348450



##########
File path: core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
##########
@@ -65,7 +65,7 @@
     this.recordsNumThreshold = recordsNumThreshold;
   }
 
-  SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,
+  public SortedPosDeleteWriter(FileAppenderFactory<T> appenderFactory,

Review comment:
       Nit: reformat this constructor ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r679859836



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,

Review comment:
       Updated to open one by one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r600295607



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSparkAction;
+import org.apache.iceberg.actions.RewriteDeleteActionResult;
+import org.apache.iceberg.actions.RewriteDeletes;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRewriteDeletesSparkAction extends BaseSparkAction<RewriteDeletes, RewriteDeletes.Result>
+    implements RewriteDeletes {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+  private boolean rewriteEqualityDelete;
+
+  public BaseRewriteDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table);
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  public Result execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    if (!rewriteEqualityDelete) {
+      LOG.warn("Only supports rewrite equality deletes currently");
+      return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->

Review comment:
       Do we need to do the nullable check before filter this `fileScanTasks` ?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSparkAction;
+import org.apache.iceberg.actions.RewriteDeleteActionResult;
+import org.apache.iceberg.actions.RewriteDeletes;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRewriteDeletesSparkAction extends BaseSparkAction<RewriteDeletes, RewriteDeletes.Result>
+    implements RewriteDeletes {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+  private boolean rewriteEqualityDelete;
+
+  public BaseRewriteDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table);
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  public Result execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    if (!rewriteEqualityDelete) {
+      LOG.warn("Only supports rewrite equality deletes currently");
+      return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      EqualityDeleteRewriter deleteRewriter = new EqualityDeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty()) {
+        rewriteEqualityDeletes(Lists.newArrayList(eqDeletes), posDeletes);
+        return new RewriteDeleteActionResult(Lists.newArrayList(eqDeletes), posDeletes);
+      }
+    }
+
+    return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+  }
+
+  @Override
+  public RewriteDeletes rewriteEqualityDeletes() {
+    this.rewriteEqualityDelete = true;
+    return this;
+  }
+
+  @Override
+  protected RewriteDeletes self() {
+    return null;

Review comment:
       null ?  here we should return `this` ?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDeletesSparkAction.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.BaseSparkAction;
+import org.apache.iceberg.actions.RewriteDeleteActionResult;
+import org.apache.iceberg.actions.RewriteDeletes;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseRewriteDeletesSparkAction extends BaseSparkAction<RewriteDeletes, RewriteDeletes.Result>
+    implements RewriteDeletes {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDeletesSparkAction.class);
+  private final Table table;
+  private final JavaSparkContext sparkContext;
+  private FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private final boolean caseSensitive;
+  private final PartitionSpec spec;
+  private final long targetSizeInBytes;
+  private final int splitLookback;
+  private final long splitOpenFileCost;
+  private boolean rewriteEqualityDelete;
+
+  public BaseRewriteDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+    this.sparkContext = new JavaSparkContext(spark.sparkContext());
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+    this.caseSensitive = false;
+    this.spec = table.spec();
+
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected FileIO fileIO() {
+    if (this.fileIO == null) {
+      this.fileIO = SparkUtil.serializableFileIO(table);
+    }
+    return this.fileIO;
+  }
+
+  @Override
+  public Result execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    if (!rewriteEqualityDelete) {
+      LOG.warn("Only supports rewrite equality deletes currently");
+      return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+    }
+
+    CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()
+        .map(entry -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(entry.getValue()), targetSizeInBytes);
+          return Pair.of(entry.getKey().get(),
+              TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost));
+        })
+        .flatMap(pair -> StreamSupport.stream(CloseableIterable
+            .transform(pair.second(), task -> Pair.of(pair.first(), task)).spliterator(), false)
+        )
+        .collect(Collectors.toList());
+
+    if (!combinedScanTasks.isEmpty()) {
+      JavaRDD<Pair<StructLike, CombinedScanTask>> taskRDD = sparkContext.parallelize(combinedScanTasks,
+          combinedScanTasks.size());
+      Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+      Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+
+      EqualityDeleteRewriter deleteRewriter = new EqualityDeleteRewriter(table, caseSensitive, io, encryption);
+      List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD);
+
+      if (!eqDeletes.isEmpty()) {
+        rewriteEqualityDeletes(Lists.newArrayList(eqDeletes), posDeletes);
+        return new RewriteDeleteActionResult(Lists.newArrayList(eqDeletes), posDeletes);
+      }
+    }
+
+    return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList());
+  }
+
+  @Override
+  public RewriteDeletes rewriteEqualityDeletes() {
+    this.rewriteEqualityDelete = true;
+    return this;
+  }
+
+  @Override
+  protected RewriteDeletes self() {
+    return null;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(

Review comment:
       Nit:  I see there are another same `groupTasksByPartition` in BaseRewriteDeletesSparkAction,  maybe we could use the same method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#issuecomment-809468612


   Thanks for review, @openinx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r600350461



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRewriter.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.SortedPosDeleteWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class EqualityDeleteRewriter implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(EqualityDeleteRewriter.class);
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final Schema schema;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final LocationProvider locations;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+
+  public EqualityDeleteRewriter(Table table, boolean caseSensitive,
+                                Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.spec = table.spec();
+    this.schema = table.schema();
+    this.locations = table.locationProvider();
+    this.caseSensitive = caseSensitive;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.properties = table.properties();
+    this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
+
+    String formatString = table.properties().getOrDefault(
+        TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));

Review comment:
       Nit:  could we align the assignment order with the field definition order ? That helps a lot when checking all those assignment. thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677949653



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class);
+
+  private final Table table;
+  private long deleteTargetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private CloseableIterable<FileScanTask> tasksWithEqDelete;
+  private Iterable<DeleteFile> deletesToReplace;
+  private final JavaSparkContext sparkContext;
+
+  /**
+   * Defines whether to split out the result position deletes by data file names.
+   *
+   * This should be used in EqualityDeleteRewriter.
+   */
+  public static final String SPLIT_POSITION_DELETE = "split-position-delete";
+
+  public ConvertEqDeletesStrategy(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  @Override
+  public String name() {
+    return "CONVERT-EQUALITY-DELETES";
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Iterable<DeleteFile> selectDeletes() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;

Review comment:
       nit: can we simplify this block with something like the following?
   
   ```java
   try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().ignoreResiduals().planFiles()) {
      ...
   } finally {
     ...
   }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677950620



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class);
+
+  private final Table table;
+  private long deleteTargetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private CloseableIterable<FileScanTask> tasksWithEqDelete;
+  private Iterable<DeleteFile> deletesToReplace;
+  private final JavaSparkContext sparkContext;
+
+  /**
+   * Defines whether to split out the result position deletes by data file names.
+   *
+   * This should be used in EqualityDeleteRewriter.
+   */
+  public static final String SPLIT_POSITION_DELETE = "split-position-delete";
+
+  public ConvertEqDeletesStrategy(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  @Override
+  public String name() {
+    return "CONVERT-EQUALITY-DELETES";
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Iterable<DeleteFile> selectDeletes() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();

Review comment:
       nit: I think we can do a `flatMap` from tasks to deletes, and then filter and use `forEach(eqDeletes.add)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r600258260



##########
File path: api/src/main/java/org/apache/iceberg/RewriteFiles.java
##########
@@ -42,4 +42,13 @@
    * @return this for method chaining
    */
   RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> filesToAdd);
+
+  /**
+   * Add a rewrite that replaces one set of deletes with another that contains the same deleted rows.
+   *
+   * @param deletesToDelete files that will be replaced, cannot be null or empty.
+   * @param deletesToAdd files that will be added, cannot be null or empty.
+   * @return this for method chaining
+   */
+  RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd);

Review comment:
       Before we get this PR merged,  we must merge this PR first (https://github.com/apache/iceberg/pull/2294).  Because PR 2294 has extended the API `RewriteFiles` to accept both data files & delete files.  This PR is trying to convert equality-deletions to pos-deletions , which is actually a special case of the PR 2294.  After we merged PR 2294,  we could rebase this PR. 
   
   For getting https://github.com/apache/iceberg/pull/2294 merged,  I need a committer to review it. FYI @rdblue , @yyanyy .
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678122199



##########
File path: core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
##########
@@ -64,4 +77,21 @@ public static boolean hasDeletes(FileScanTask task) {
             splitFiles),
         BaseCombinedScanTask::new);
   }
+
+  public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(

Review comment:
       Moving the common `groupTasksByPartition` between [BaseRewriteDataFilesAction](https://github.com/apache/iceberg/blob/a1bd63d56751999ecee89b871992d7bac395fd52/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java#L256) and [ConvertEqDeletesStrategy](https://github.com/apache/iceberg/pull/2364/files#diff-b31571d37b1152b422d32af62978f2954359254ac11a7f4338e3116b1b262c4cR137) into TableScanUtil should be OK for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678179542



##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -139,43 +173,122 @@ protected long pos(T record) {
           CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
           deleteSchema.asStruct());
 
-      Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
-      isInDeleteSets.add(isInDeleteSet);
+      isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) :
+              isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));
     }
 
-    return isInDeleteSets;
+    return isDeleted;
   }
 
-  public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
+  private Predicate<T> buildPosDeletePredicate() {
+    if (posDeletes.isEmpty()) {
+      return null;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+    Set<Long> deleteSet = Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes));
+    if (deleteSet.isEmpty()) {
+      return null;
+    }
+
+    return record -> deleteSet.contains(pos(record));
+  }
+
+  public CloseableIterable<T> keepRowsFromDeletes(CloseableIterable<T> records) {
+    Predicate<T> isDeletedFromPosDeletes = buildPosDeletePredicate();
+    if (isDeletedFromPosDeletes == null) {
+      return keepRowsFromEqualityDeletes(records);
+    }
+
+    Predicate<T> isDeletedFromEqDeletes = buildEqDeletePredicate();
+    if (isDeletedFromEqDeletes == null) {
+      return keepRowsFromPosDeletes(records);
+    }
+
+    CloseableIterable<T> markedRecords;
+
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      markedRecords = CloseableIterable.transform(records, record -> {
+        if (isDeletedFromPosDeletes.test(record) || isDeletedFromEqDeletes.test(record)) {
+          deleteMarker().accept(record);
+        }
+        return record;
+      });
+
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos,
+          Deletes.deletePositions(dataFile.path(), deletes), deleteMarker()), record -> {
+          if (!isDeletedRow(record) && isDeletedFromEqDeletes.test(record)) {
+            deleteMarker().accept(record);
+          }
+          return record;
+        });
+    }
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  private CloseableIterable<T> selectRowsFromDeletes(CloseableIterable<T> records, Predicate<T> isDeleted) {
+    CloseableIterable<T> markedRecords = CloseableIterable.transform(records, record -> {
+      if (isDeleted.test(record)) {
+        deleteMarker().accept(record);
+      }
+      return record;
+    });
+
+    return deletedRowsSelector().filter(markedRecords);
+  }
+
+  public CloseableIterable<T> keepRowsFromEqualityDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row has been deleted by equality deletions.
-    Predicate<T> deletedRows = applyEqDeletes().stream()
-        .reduce(Predicate::or)
-        .orElse(t -> false);
+    Predicate<T> isDeleted = buildEqDeletePredicate();
+    if (isDeleted == null) {
+      return CloseableIterable.empty();
+    }
 
-    Filter<T> deletedRowsFilter = new Filter<T>() {
-      @Override
-      protected boolean shouldKeep(T item) {
-        return deletedRows.test(item);
+    return selectRowsFromDeletes(records, isDeleted);
+  }
+
+  public CloseableIterable<T> keepRowsFromPosDeletes(CloseableIterable<T> records) {
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      // Predicate to test whether a row has been deleted by equality deletions.
+      Predicate<T> isDeleted = buildPosDeletePredicate();
+      if (isDeleted == null) {
+        return CloseableIterable.empty();
       }
-    };
-    return deletedRowsFilter.filter(records);
+      return selectRowsFromDeletes(records, isDeleted);
+    } else {
+      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+      CloseableIterable<T> markedRecords = Deletes.streamingDeletedRowMarker(records, this::pos,
+              Deletes.deletePositions(dataFile.path(), deletes), deleteMarker());
+
+      return deletedRowsSelector().filter(markedRecords);
+    }
   }
 
   private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
     // Predicate to test whether a row should be visible to user after applying equality deletions.
-    Predicate<T> remainingRows = applyEqDeletes().stream()
-        .map(Predicate::negate)
-        .reduce(Predicate::and)
-        .orElse(t -> true);
+    Predicate<T> isDeleted = buildEqDeletePredicate();

Review comment:
       Looks like we are separating the RewriteDeletes path and normal read path into two branches: 
   For the RewriteDeletes path, we introduced three new methods: 
   * keepRowsFromDeletes
   * keepRowsFromEqualityDeletes
   * keepRowsFromPosDeletes
   
   For the normal read path, we introduced another three methods: 
   * applyEqDeletes
   * applyPosDeletes
   * filter
   
   I remember there's an issue that we discussed to introduce the `is_deleted` meta column because we want to unify all the rewrite paths and normal read path ?  ( I cannot find the specific PR now...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r679356589



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(ConvertEqDeletesStrategy.class);
+
+  private final Table table;
+  private long deleteTargetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  private CloseableIterable<FileScanTask> tasksWithEqDelete;
+  private Iterable<DeleteFile> deletesToReplace;
+  private final JavaSparkContext sparkContext;
+
+  /**
+   * Defines whether to split out the result position deletes by data file names.
+   *
+   * This should be used in EqualityDeleteRewriter.
+   */
+  public static final String SPLIT_POSITION_DELETE = "split-position-delete";
+
+  public ConvertEqDeletesStrategy(SparkSession spark, Table table) {
+    this.table = table;
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.deleteTargetSizeInBytes = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  @Override
+  public String name() {
+    return "CONVERT-EQUALITY-DELETES";
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Iterable<DeleteFile> selectDeletes() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .ignoreResiduals()
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan ->
+        scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES))
+    );
+
+    Set<DeleteFile> eqDeletes = Sets.newHashSet();
+    tasksWithEqDelete.forEach(task -> {
+      eqDeletes.addAll(task.deletes().stream()
+          .filter(deleteFile -> deleteFile.content().equals(FileContent.EQUALITY_DELETES))
+          .collect(Collectors.toList()));
+    });
+
+    deletesToReplace = eqDeletes;
+
+    return deletesToReplace;
+  }
+
+  @Override
+  public Iterable<DeleteFile> rewriteDeletes() {
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks =
+        TableScanUtil.groupTasksByPartition(table.spec(), tasksWithEqDelete.iterator());
+
+    // Split and combine tasks under each partition
+    List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream()

Review comment:
       I'm updating this PR according to the API changes, the changes of `selectDeletesToRewrite` and `rewriteDeletes` are OK to me. But `Iterable<List<FileScanTask>> planDeleteGroups(Iterable<DeleteFile> deleteFiles);` is a bit wired since it returns groups of `List<FileScanTask>`, while a `FileScanTask` could contains several deletes which don't exist in the `deleteFiles`. So I prefer to return `Iterable<list<DeleteFile>>`. It is worth noting that one data file could have several deletes,  so we could not directly using `FileScanTask` to transfer the deletes. This is slightly different from the date file rewrite.
   
   > And we can get the partition StructLike directly from the list of scan tasks instead of passing it through the task pair in EqualityDeleteRewriter. In this way, we can also enable partial progress for commits.
   
   The scan tasks in a group may belong to different partitions. So unless we group deletes by partition, it needs to know the partition values. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r677948429



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/ConvertEqDeletesStrategy.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDeleteStrategy;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.source.EqualityDeleteRewriter;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy {

Review comment:
       I think we should have an abstract `ConvertEqDeletesStrategy` and a `Spark3ConvertEqDeletesStrategy`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2364: Spark: Add an action to rewrite equality deletes

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2364:
URL: https://github.com/apache/iceberg/pull/2364#discussion_r678124432



##########
File path: core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
##########
@@ -64,4 +77,21 @@ public static boolean hasDeletes(FileScanTask task) {
             splitFiles),
         BaseCombinedScanTask::new);
   }
+
+  public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      PartitionSpec spec,

Review comment:
       I don't think it's correct to use the table latest partition spec to group the `FileScanTask`,  because different `FileScanTask`  many have different partition specs,  the correct way is to use the `FileScanTask#spec` to group the tasks.   We should remove the `spec` as an argument, otherwise it introducing a bug...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org