You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/10 20:12:43 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5491: Spark 3.2: Count delete files in DeleteReachableFiles

aokolnychyi commented on code in PR #5491:
URL: https://github.com/apache/iceberg/pull/5491#discussion_r942865212


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -200,6 +210,113 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
+  /**
+   * Deletes files and keeps track of how many files were removed for each file type.
+   *
+   * @param executorService an executor service to use for parallel deletes
+   * @param deleteFunc a delete func
+   * @param files an iterator of Spark rows of the structure (path: String, type: String)
+   * @return stats on which files were deleted
+   */
+  protected DeleteSummary deleteFiles(
+      ExecutorService executorService, Consumer<String> deleteFunc, Iterator<Row> files) {
+
+    DeleteSummary summary = new DeleteSummary();
+
+    Tasks.foreach(files)
+        .retry(DELETE_NUM_RETRIES)
+        .stopRetryOn(NotFoundException.class)
+        .suppressFailureWhenFinished()
+        .executeWith(executorService)
+        .onFailure(
+            (fileInfo, exc) -> {
+              String path = fileInfo.getString(0);
+              String type = fileInfo.getString(1);
+              LOG.warn("Delete failed for {}: {}", type, path, exc);
+            })
+        .run(
+            fileInfo -> {
+              String path = fileInfo.getString(0);
+              String type = fileInfo.getString(1);
+              deleteFunc.accept(path);
+              summary.deletedFile(path, type);
+            });
+
+    return summary;
+  }
+
+  static class DeleteSummary {
+    private final AtomicLong dataFilesCount = new AtomicLong(0L);
+    private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
+    private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
+    private final AtomicLong manifestsCount = new AtomicLong(0L);
+    private final AtomicLong manifestListsCount = new AtomicLong(0L);
+    private final AtomicLong otherFilesCount = new AtomicLong(0L);
+
+    public void deletedFile(String path, String type) {
+      if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+        dataFilesCount.incrementAndGet();
+        LOG.trace("Deleted data file: {}", path);
+
+      } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+        positionDeleteFilesCount.incrementAndGet();
+        LOG.trace("Deleted positional delete file: {}", path);
+
+      } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+        equalityDeleteFilesCount.incrementAndGet();
+        LOG.trace("Deleted equality delete file: {}", path);
+
+      } else if (MANIFEST.equalsIgnoreCase(type)) {
+        manifestsCount.incrementAndGet();
+        LOG.debug("Deleted manifest: {}", path);

Review Comment:
   I think it was deliberate as we may delete way more data files compared to manifests. That being said, maybe we should not worry about printing all deleted files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org