You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "sririshindra (via GitHub)" <gi...@apache.org> on 2023/03/31 02:45:56 UTC

[GitHub] [iceberg] sririshindra commented on a diff in pull request #7240: Spark: Close the delete threads pool in some procedures like DeleteOrphan and ExpireSnapshots

sririshindra commented on code in PR #7240:
URL: https://github.com/apache/iceberg/pull/7240#discussion_r1153937203


##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java:
##########
@@ -228,7 +228,9 @@ public void deleteFiles(Iterable<String> paths) throws BulkDeletionFailureExcept
           throw new RuntimeException("Interrupted when waiting for deletions to complete", e);
         }
       }
-
+      if (executorService() != null) {

Review Comment:
   New line before and after this this code block



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -180,7 +180,9 @@ public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailu
               failureCount.incrementAndGet();
             })
         .run(this::deleteFile);
-
+    if (executorService() != null) {

Review Comment:
   ditto



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -132,8 +135,8 @@ public InternalRow[] call(InternalRow args) {
                       + "IO's bulk delete.",
                   table.io().getClass().getName());
             } else {
-
-              action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots"));
+              executorService = executorService(maxConcurrentDeletes, "expire-snapshots");

Review Comment:
   I doesn't seem like a good java practice to name the 'executorService' variable exactly the same as the 'executorService' method in the baseProcedure class. It would have been better had the method name in the baseProcedure class had been something like 'getExecutorService', but that change would touch too many files that are not relevant to this PR. Maybe we should instead change the variable name to something like  'execService'



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -132,8 +135,8 @@ public InternalRow[] call(InternalRow args) {
                       + "IO's bulk delete.",
                   table.io().getClass().getName());
             } else {
-
-              action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots"));
+              executorService = executorService(maxConcurrentDeletes, "expire-snapshots");

Review Comment:
   Actually, I just noticed that S3FileIo class also reuses the variable name and the method name. So, maybe this point is moot and this is just following the style in rest of the code base. We can probably leave it as is.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -166,8 +169,8 @@ public InternalRow[] call(InternalRow args) {
                       + "IO's bulk delete.",
                   table.io().getClass().getName());
             } else {
-
-              action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));
+              executorService = executorService(maxConcurrentDeletes, "remove-orphans");

Review Comment:
   ditto



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java:
##########
@@ -228,7 +228,9 @@ public void deleteFiles(Iterable<String> paths) throws BulkDeletionFailureExcept
           throw new RuntimeException("Interrupted when waiting for deletions to complete", e);
         }
       }
-
+      if (executorService() != null) {

Review Comment:
   Actually, Shutting down the executor service here is not appropriate because a client might create a single S3fileIO object and call multiple methods on it. For instance one can call the deleteFiles method multiple times on the same S3FileIo object. So, if we shut down the executor service then the second deleteFiles method will fail. 
   
   You can actually add the following line to the end of this snippet `executorService = null` . That way, if anyone calls the deleteFiles method method multiple times on the same S3FileIo object a new ExecutorService will be recreated. But that would nullify the need for executorService method. I think the point of having that method is to reuse the executorService.
   
   I think it might actually be better to remove this snippet entirely in this case and let the garbage collector take care of shutting down the executor service. But that would probably not solve the issue that you are facing. 
   
   You can maybe add a method like resetExecutorService and call it in your fork. Or you can maybe make executorService a static method, so that the same thread pool is reused every time and you wouldn't see too many threads created at the same time. But that comes with its own problem. The awsProperties object which is needed to create the executorService object is dependent on the S3FileIo object not the class. So, that would be a problem. 
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org