You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/28 01:05:45 UTC

[GitHub] [iceberg] amogh-jahagirdar opened a new pull request, #5373: API, Spark: Update remove orphan files procedure for performing batch deletion if it's supported and a batch deletion size greater than 1

amogh-jahagirdar opened a new pull request, #5373:
URL: https://github.com/apache/iceberg/pull/5373

   In this change, the DeleteOrphanFiles procedure has been updated to perform batch deletion in case a deletion batch size is set and is supported. If a batch size greater than 1 is set and the underlying FileIO does not support batch deletion, no batch deletion will be performed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1203268307

   > I'm not sure about throwing an exception if a user specify delete function and if the file io supports bulk delete is the way to go, because then we're changing the behavior of the exposed deleteFunc API. I think if deleteFunc is set, the procedure continues to use that as a source of truth regardless of bulk delete support or not. If we throw an exception, that would mean user's code would need to get rewritten if it's using S3FileIO and running this procedure with custom delete. Let me know if i'm misunderstanding!
   
   > That being said, now we are changing the behavior if they do not specify a delete func and if it supports bulk delete. This change is less intrusive because it changes internally how the procedure runs and is not really exposed to a user. Let me know what you think.
   
   Maybe it was a bit of a misunderstanding, I was talking about the flag you are discussing with @dramaticlly (useBulkDelete).  So I was thinking, if useBulkDelete is on && deleteFunc is set, then its a misconfiguration.
   
   But are we doing the flag?  Or are you suggesting, is to have deleteFunc always take precedence, ie if (deleteFunc set), always use the single file deleteFunc.   Otherwise, if FileIO supports bulkOperations, automatically use the bulk delete?
   
   
   > Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here? @dramaticlly @aokolnychyi @RussellSpitzer @karuppayya
   
   Whats the choice here, I suppose we will have to have an extra parameter on supportBulkOperations FileIOs to control retry, and this can be set by the various procedures?  I think after #5379 it will be easy to implement , as we can just set that parameter on the Tasks?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
dramaticlly commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201876458

   > In the updated logic for the procedure, we will always use the delete func if it's specified.
   > 
   > If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.
   > 
   > What this means is bulk operations will always be used (if the file io supports it) without considering user input. I was discussing with @dramaticlly perhaps we want more control over this and it should be specified in the action (something like useBulkDelete())?
   > 
   
   Thanks @amogh-jahagirdar , I was mostly thinking about this along the same line for #5412. In ideal case, we shall try to use Bulk operation if available to us (by inspecting the fileIO of given table, S3FileIO is the only supported one as of now ). On the other side, I think we need to maintain the backward compatibility, if some iceberg user already provide custom `deleteFunc` to the SparkAction in the past, everything shall work as expected.
   
   In my change, I added a new public method called `bulkDeleteFunc` to allow overrides if needed, this provides a way for customization even for tables on S3fileIO, or it can be used to disable the Bulk deletion for some troubleshooting and debugging needs. 
   
   
   > Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries would depend on the desired behavior for the procedure. What are peoples thoughts here?
   
   Personally I think delegate to fileIO for batching make sense to me, add additional parameter like batchSize in procedure call might cause confusion and hard to make it right. The downside I saw is about test hardness, wish for more inputs around it for test between classess. Current test class are all using Hadooptables and pick HiveTable with fileIO support SupportsBulkOperations seem to be overkill for unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201578469

   > LGTM, I was thinking about adding one for expire snapshots as well. But looking at this change, I noticed there's no test coverage included, can we add any unit tests for this ?
   
   Yeah for sure, I still need to add tests for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201969319

   > but we throw an exception if they want a customized delete function AND bulk.
   
   I'm not sure about throwing an exception if a user specify delete function and if the file io supports bulk delete is the way to go, because then we're changing the behavior of the exposed deleteFunc API. I think if deleteFunc is set, the procedure continues to use that as a source of truth regardless of bulk delete support or not. So that would mean user's code would need to get rewritten if it's using S3FileIO and running this procedure with custom delete.
   
   We are changing the behavior if they do not specify a delete func and if it supports bulk delete, but the behavior changes internally how the procedure runs and is not really exposed to a user. Let me know what you think. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934791146


##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);

Review Comment:
   Hey Szeon, I'm going to update this PR based on my change in https://github.com/apache/iceberg/pull/5379/files.
   
   My thinking is we should always just use deleteFunc if it's passed in (should be source of truth for deletion if it's set). If it's not and in the case that the file io supports bulk operation we can just do fileIO.deleteFiles(), otherwise we go back to the existing mechanism.
   
   That way we preserve existing behavior for the procedure and have the additional optimization for file io types which support bulk delete. Also the underlying fileIO can take care of how the batches are created (so it's optimal for the underlying storage, in the sense of maximizing throughput of the deletion, minimizing throttling, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201931976

   > If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.
   
   I think that's a good simplification too, if the user wants customization then they can deal with a non-bulk (slower) one, but we throw an exception if they want a customized delete function AND bulk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934826871


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -161,6 +163,12 @@ public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
     return this;
   }
 
+  @Override
+  public DeleteOrphanFilesSparkAction batchSize(int batchSize) {

Review Comment:
   Great catch, I'm removing this in the spark procedure but it should be done at the file IO layer, which I missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r932882608


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);
+    } else {
+      Tasks.foreach(orphanFiles)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
+          .run(deleteFunc::accept);
+    }

Review Comment:
   +1, having a helper / wrapper to wrap this if ..else should be a good one to have, and can help in adding this change more places !
   
   [Not in scope of pr] Any thoughts on ResolvingFileIO which is a wrapper on other fileIO's. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201713531

   I've updated this PR to be based off my change here: https://github.com/apache/iceberg/pull/5379/files
   
   That change should go in first and then this PR can just focus on the integration with the procedure.
   
   In the updated logic for the procedure, we will always use the delete func if it's specified. 
   
   If a delete function is not specified and the file io does not support bulk operations we would use the default delete function. Otherwise the bulk delete is used by default.
   
   What this means is bulk operations will always be used without considering user input. I was discussing with @dramaticlly perhaps we want more control over this and it should be specified in the action (something like useBulkDelete())?
   
   Also now we are delegating task management to the file IO, which I think makes sense but there's another argument that each procedure should control this since failure handling or retries. What are peoples thoughts here? @dramaticlly @aokolnychyi @RussellSpitzer @karuppayya 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934827771


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -161,6 +163,12 @@ public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
     return this;
   }
 
+  @Override
+  public DeleteOrphanFilesSparkAction batchSize(int batchSize) {

Review Comment:
   Actually, nvm I already have this check here for the S3FileIO case: https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java#L527
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5373: API, Spark: Update remove orphan files procedure to use batch deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1197699684

   Still figuring out a good way to write tests for this, but in the interim @RussellSpitzer @aokolnychyi @jackye1995 would like to get your feedback!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r932802685


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);

Review Comment:
   I'm thinking we leave the batching logic to the fileIO. Since the pattern being followed for more advanced file io interaction is to have mixin interfaces, this lets us delegate more to fileIO with "special" capabilities. So I think it makes sense to remove batching from the action itself and just let the fileIO take care of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1464294529

   Close as duplicate for #6682 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934777491


##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);

Review Comment:
   Was looking a bit with @dramaticlly at this earlier.  My concern here is the existing 'deleteFunc' plugin mechanism, which I know some users are using for dry-run purpose, ie printing out the list of files to delete instead of actually deleting them.  Im thinking:
   
   - do a precondition check, if the deleteFunc != defaultDeleteFunc.  If so, then dont allow bulkDelete
   - add a bulk deleteFunc?  
   @aokolnychyi any thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#issuecomment-1201576505

   @szehon-ho @dramaticlly @aokolnychyi also for this PR, I'll just focus on spark 3.3, and we can copy the implementation changes to other versions in separate PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
dramaticlly commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934801025


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);

Review Comment:
   if I understand correctly, this batchDeletionSize determine how long of a list can be passed to fileIO batch deletion, so in order for fileIO to handle I guess this would be set to larger than expected one? 
   
   Say if we set `batchDeletionSize = 50` in remove-orphan call and [`S3FILEIO_DELETE_BATCH_SIZE_DEFAULT = 250`](https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java#L254), the S3 can only bucket 50 orphan files in a single call 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r932574242


##########
spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);
+    } else {
+      Tasks.foreach(orphanFiles)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
+          .run(deleteFunc::accept);
+    }

Review Comment:
   This can probably be abstracted away into a single method which delegates to the right approach but I didn't want to introduce more indirection or expose public methods unnecessarily until we know for sure we want them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dramaticlly commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
dramaticlly commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934823675


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -161,6 +163,12 @@ public DeleteOrphanFilesSparkAction olderThan(long newOlderThanTimestamp) {
     return this;
   }
 
+  @Override
+  public DeleteOrphanFilesSparkAction batchSize(int batchSize) {

Review Comment:
   You probably want to use `Preconditions` to validate and throw `IllegalArgumentException ` for invalid batch size such as 0 or negative number based on the javadoc in `api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java` interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5373:
URL: https://github.com/apache/iceberg/pull/5373#discussion_r934791146


##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -182,12 +192,26 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         actualFileDF.join(validFileDF, joinCond, "leftanti").as(Encoders.STRING()).collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    if (batchDeletionSize > 1) {
+      Preconditions.checkArgument(
+          table.io() instanceof SupportsBulkOperations,
+          "FileIO %s does not support bulk deletion",
+          table.io().getClass().getName());
+      SupportsBulkOperations bulkFileIO = (SupportsBulkOperations) table.io();
+      List<List<String>> fileBatches = Lists.partition(orphanFiles, batchDeletionSize);
+      Tasks.foreach(fileBatches)
+          .noRetry()
+          .executeWith(deleteExecutorService)
+          .suppressFailureWhenFinished()
+          .run(bulkFileIO::deleteFiles);

Review Comment:
   Hey Szeon, I'm going to update this PR based on my change in https://github.com/apache/iceberg/pull/5379/files.
   
   My thinking is we should always just use deleteFunc if it's passed in (should be source of truth for deletion if it's set). If it's not and in the case that the file io supports bulk operation we can just do fileIO.deleteFiles(), otherwise we go back to the existing mechanism.
   
   That way we preserve existing behavior for the procedure and have the additional optimization for file io types which support bulk delete. Also the underlying fileIO can take care of how the batches are created (so it's optimal for the underlying storage, in the sense of maximizing throughput of the deletion, minimizing throttling, etc). The underlying fileIO can also handle parallelism as its desire. So in the procedure level we don't need to pass in a batch size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 closed pull request #5373: API, Spark: Update remove orphan files procedure to use bulk deletion if applicable
URL: https://github.com/apache/iceberg/pull/5373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org