You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/01/28 19:00:44 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6682: Bulk delete

amogh-jahagirdar commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1089790123


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_GROUP_SIZE = 100000;

Review Comment:
   Is it feasible to make this value configurable through the options passed in to the action? 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -98,6 +101,12 @@ public InternalRow[] call(InternalRow args) {
     String location = args.isNullAt(2) ? null : args.getString(2);
     boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
     Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
+    if (maxConcurrentDeletes != null) {
+      LOG.warn(
+          "{} is now deprecated, parallelism should now be configured in the FileIO bulk operations. Check the"
+              + "configured FileIO for more information",
+          PARAMETERS[4].name());
+    }

Review Comment:
   Style nit, new line after the if block



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java:
##########
@@ -265,7 +285,15 @@ private Set<Long> findExpiredSnapshotIds(
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
-    DeleteSummary summary = deleteFiles(deleteExecutorService, deleteFunc, files);
+    DeleteSummary summary;
+    if (ops.io() instanceof SupportsBulkOperations) {
+      LOG.info("Triggering Bulk Delete Operations");
+      summary = deleteFiles(bulkDeleteFunc, files);
+    } else {
+      LOG.warn("Warning falling back to non-bulk deletes");

Review Comment:
   Nit on the warn log, I think we should make it more clear why it's a warning so users don't think anything bad is happening:
   
   "Falling back to non-bulk deletes. Bulk deletes are recommended for better deletion throughput"



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -149,6 +164,45 @@ public void deletePrefix(String prefix) {
     }
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
+    AtomicInteger failureCount = new AtomicInteger(0);
+    Tasks.foreach(pathsToDelete)
+        .executeWith(executorService())
+        .retry(3)
+        .stopRetryOn(FileNotFoundException.class)

Review Comment:
   Perhaps this is out of scope for this PR since for other delete operations we only stop retry on FileNotFoundException, but it seems there are other cases where we should stop retrying like permission denied errors. https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L2848 or AccessControlExceptions in Hadoop cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org