You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/14 12:47:05 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request, #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

RussellSpitzer opened a new pull request, #6588:
URL: https://github.com/apache/iceberg/pull/6588

   An issue we've run into frequently is that several Spark actions perform deletes on the driver with a default parallelism of 1. This is quite slow for S3 and painfully slow for very large tables. To fix this we change the default behavior to always be multithreaded deletes.
   
   The default for all Spark related actions can then be changed with a SQL Conf parameter as well as within each command with their own parallelism parameters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#issuecomment-1387728844

   Let me take a look soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#discussion_r1070261893


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -231,24 +258,27 @@ protected DeleteSummary deleteFiles(
 
     DeleteSummary summary = new DeleteSummary();
 
-    Tasks.foreach(files)
-        .retry(DELETE_NUM_RETRIES)
-        .stopRetryOn(NotFoundException.class)
-        .suppressFailureWhenFinished()
-        .executeWith(executorService)
-        .onFailure(
-            (fileInfo, exc) -> {
-              String path = fileInfo.getPath();
-              String type = fileInfo.getType();
-              LOG.warn("Delete failed for {}: {}", type, path, exc);
-            })
-        .run(
-            fileInfo -> {
-              String path = fileInfo.getPath();
-              String type = fileInfo.getType();
-              deleteFunc.accept(path);
-              summary.deletedFile(path, type);
-            });
+
+    withDefaultDeleteService(executorService, (deleteService) -> {

Review Comment:
   This covers ExpireSnapshots and DropTable with Purge (via DeleteReachableFilesAction)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#issuecomment-1402302189

   cc #5373 #5375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#issuecomment-1382730394

   @anuragmantri + @aokolnychyi + @rdblue - This is a bit of a big default behavior change but it's been biting a lot of our users lately and the change is relatively safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#discussion_r1070261893


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -231,24 +258,27 @@ protected DeleteSummary deleteFiles(
 
     DeleteSummary summary = new DeleteSummary();
 
-    Tasks.foreach(files)
-        .retry(DELETE_NUM_RETRIES)
-        .stopRetryOn(NotFoundException.class)
-        .suppressFailureWhenFinished()
-        .executeWith(executorService)
-        .onFailure(
-            (fileInfo, exc) -> {
-              String path = fileInfo.getPath();
-              String type = fileInfo.getType();
-              LOG.warn("Delete failed for {}: {}", type, path, exc);
-            })
-        .run(
-            fileInfo -> {
-              String path = fileInfo.getPath();
-              String type = fileInfo.getType();
-              deleteFunc.accept(path);
-              summary.deletedFile(path, type);
-            });
+
+    withDefaultDeleteService(executorService, (deleteService) -> {

Review Comment:
   This covers ExpireSnapshots and DropTable with Purge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#issuecomment-1396207315

   Talking with @aokolnychyi we decided we are going to take a slightly broader approach here. Rather than allowing each action to define it's own method of deleting and using custom executor services, we will fall back to each FileIO's bulk delete support. We will then add a basic parallel default delete to HDFS for bulk delete.
   
   After doing this we will deprecate all the "parallel delete" methods from the actions and procedures instead instructing users to use their IO specific parallelism controls.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#issuecomment-1397883260

   Thanks for clarifying @RussellSpitzer I think it makes a ton of sense to leave the specifics of bulk vs parallel to the FileIO abstraction. In this case, we leverage bulk delete wherever possible (S3) and then do parallel deletions for file systems which don't support bulk ops like HDFS to improve the throughput of deletions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#discussion_r1070261827


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -47,4 +47,8 @@ private SparkSQLProperties() {}
   public static final String PRESERVE_DATA_GROUPING =
       "spark.sql.iceberg.planning.preserve-data-grouping";
   public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
+
+  // Controls how many physical file deletes to execute in parallel when not otherwise specified
+  public static final String DELETE_PARALLELISM = "driver-delete-default-parallelism";
+  public static final String DELETE_PARALLELISM_DEFAULT = "25";

Review Comment:
   With S3's request throttling around 4k requests a second this gives us a lot of overhead. 
   Assuming a 50ms response time
   4000 max requests / Second / 20 requests per thread per second =~  200 max concurrent requests. 
   
   Another option for this is to also incorporate the "bulk delete" apis but that would only help with S3 based filesystems.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on code in PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#discussion_r1070261927


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -246,12 +246,13 @@ private DeleteOrphanFiles.Result doExecute() {
     List<String> orphanFiles =
         findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .executeWith(deleteExecutorService)
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    withDefaultDeleteService(deleteExecutorService, deleteService ->

Review Comment:
   This change covers RemoveOrphanFiles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] anuragmantri commented on a diff in pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by GitBox <gi...@apache.org>.
anuragmantri commented on code in PR #6588:
URL: https://github.com/apache/iceberg/pull/6588#discussion_r1070395384


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -47,4 +47,8 @@ private SparkSQLProperties() {}
   public static final String PRESERVE_DATA_GROUPING =
       "spark.sql.iceberg.planning.preserve-data-grouping";
   public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
+
+  // Controls how many physical file deletes to execute in parallel when not otherwise specified
+  public static final String DELETE_PARALLELISM = "driver-delete-default-parallelism";

Review Comment:
   Should we change this to `spark.sql.iceberg.driver-delete-default-parallelism` to be consistent with the rest of the properties in this file? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer closed pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer closed pull request #6588: Spark 3.3: Add Default Parallelism Level for All Spark Driver Based Deletes
URL: https://github.com/apache/iceberg/pull/6588


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org