You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/10 07:03:33 UTC

[GitHub] [iceberg] zhangdove opened a new pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

zhangdove opened a new pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313


   My use case :
   1. table.expireSnapshots().cleanExpiredFiles(false).commit() [https://github.com/apache/iceberg/pull/1244]
   2. actions.removeOrphanFiles().olderThan(t1).execute()
   
   The first step takes about two seconds.However,in the second step of deleting the files, as the number of files increases, the deletion time becomes slower and slower, which is not what I want. If I do not understand the error, delete the file executed by a single thread in Spark Driver. Can we move the execution-deletion file from the Driver side to Spark's Executor to do multithreaded erasure of orphaned files?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove closed pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
zhangdove closed pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467931551



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -161,15 +172,44 @@ public RemoveOrphanFilesAction deleteWith(Consumer<String> newDeleteFunc) {
         .as(Encoders.STRING())
         .collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    paralleExecutor(sparkContext, orphanFiles, excutorTaskNum, deleteFunc);
 
     return orphanFiles;
   }
 
+  private void paralleExecutor(JavaSparkContext javaSc, List<String> orphanFiles,
+                              int numSlices, Consumer deleteConsumer) {
+    if (numSlices == 0) {
+      Tasks.foreach(orphanFiles)
+          .noRetry()
+          .suppressFailureWhenFinished()

Review comment:
       We could just make this parallel as well as in the RemoveSnapshots code, where the user can pass in a different execution pool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467931062



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -161,15 +172,44 @@ public RemoveOrphanFilesAction deleteWith(Consumer<String> newDeleteFunc) {
         .as(Encoders.STRING())
         .collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    paralleExecutor(sparkContext, orphanFiles, excutorTaskNum, deleteFunc);
 
     return orphanFiles;
   }
 
+  private void paralleExecutor(JavaSparkContext javaSc, List<String> orphanFiles,
+                              int numSlices, Consumer deleteConsumer) {
+    if (numSlices == 0) {

Review comment:
       I'm not a big fan of parameters which trigger different behaviors on certain values. Perhaps there should be a boolean which enables the different execution mode?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#issuecomment-671514522


   I would like to make sure I get the use case correctly. Is the main purpose to use `RemoveOrphanFilesAction` to clean up expired files? If so, I think that's a misuse of the API. Instead, one should use `ExpireSnapshotsAction` that Russell is working on in PR #1264.
   
   The most expensive part in `RemoveOrphanFilesAction` is listing. The bigger table the more pressure you put on the storage. The number of orphan files should be usually small so the delete should be quick. This action should be called maybe once a month or so. `ExpireSnapshotsAction`, on the other hand, is meant to be called regularly. It will have an executor service to parallelize deletes. `ExpireSnapshotsAction` is supposed to delete a lot of files.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467941219



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +569,39 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesWithExecutorParallel() {
+    Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<ThreeColumnRecord> records = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    for (int i = 0; i < 20; i++) {
+      df.write().mode("append").parquet(tableLocation + "/data");
+    }
+
+    Actions actions = Actions.forTable(table);
+
+    List<String> result = actions.removeOrphanFiles()
+        .olderThan(System.currentTimeMillis())
+        .executorParallelNum(10)
+        .execute();
+    Assert.assertEquals("Should delete only 20 files", 20, result.size());
+
+    // gets the number of tasks completed by the last stage
+    int completedTasks = spark.sparkContext().statusTracker()
+        .getStageInfo(23).get().numCompletedTasks();

Review comment:
       If the test here is just to determine the number of partitions in the RDD it's probably safer to expose that as a package protected member of the class and call .partitions. The magic number here is going to be brittle. 
   
   One fun little game you could also play here to count tasks that were run, would be to implement another "DeleteConsumer" that just uses an accumulator and just add's 1 on each forEachParititon. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467936640



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -106,6 +106,7 @@ public void accept(String file) {
     this.table = table;
     this.ops = ((HasTableOperations) table).operations();
     this.location = table.location();
+    this.deleteFunc = new DeleteConsumer(table);

Review comment:
       We set up a bit of a trap here. A serializable function is required here when using the Parallel path but we don't type check that we have a serializable consumer for this field. So we can end up with a user passing a non-serializable function here and getting a runtime error. Not sure if we want to lock this function to only serializable delete functions but I think we should at least document this and warn users.
   
   Another early check could be to attempt to cast the Lambda to serializable before performing the parallelization and providing an explanation of the failure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#issuecomment-671514522


   I would like to make sure I get the use case correctly. Is the main purpose to use `RemoveOrphanFilesAction` to clean up expired files? If so, I think that's a misuse of the API. Instead, one should use `ExpireSnapshotsAction` that Russell is working on in PR #1264.
   
   The most expensive part in `RemoveOrphanFilesAction` is listing. The bigger table the more pressure you put on the storage. The number of orphan files should be usually small so the delete should be quick. This action should be called maybe once a month or so. `ExpireSnapshotsAction`, on the other hand, is meant to be called regularly. It will have an executor service to parallelize deletes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467938888



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -161,15 +172,44 @@ public RemoveOrphanFilesAction deleteWith(Consumer<String> newDeleteFunc) {
         .as(Encoders.STRING())
         .collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    paralleExecutor(sparkContext, orphanFiles, excutorTaskNum, deleteFunc);
 
     return orphanFiles;
   }
 
+  private void paralleExecutor(JavaSparkContext javaSc, List<String> orphanFiles,

Review comment:
       I'm sure others have different opinions, but I would take the branch out of this function, currently it reads as though on line 175 all deletes are parallelized and then you need to look into this function to see that it's not actually the case. So either a rename "maybeExecuteParallel" or move the if branch out.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467932715



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
##########
@@ -161,15 +172,44 @@ public RemoveOrphanFilesAction deleteWith(Consumer<String> newDeleteFunc) {
         .as(Encoders.STRING())
         .collectAsList();
 
-    Tasks.foreach(orphanFiles)
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
-        .run(deleteFunc::accept);
+    paralleExecutor(sparkContext, orphanFiles, excutorTaskNum, deleteFunc);
 
     return orphanFiles;
   }
 
+  private void paralleExecutor(JavaSparkContext javaSc, List<String> orphanFiles,
+                              int numSlices, Consumer deleteConsumer) {
+    if (numSlices == 0) {
+      Tasks.foreach(orphanFiles)
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
+          .run(deleteConsumer::accept);
+    } else {
+      javaSc.parallelize(orphanFiles, numSlices).foreachPartition(row -> {
+        List<String> orphanFileList = IteratorUtils.toList(row);
+        Tasks.foreach(orphanFileList)
+            .noRetry()
+            .suppressFailureWhenFinished()
+            .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
+            .run(deleteConsumer::accept);
+      });
+    }
+  }
+
+  public static class DeleteConsumer implements Consumer<String>, Serializable {

Review comment:
       I wonder if it makes sense to add a Serializable Consumer class as a utility. Then have this DeleteConsumer extend that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
zhangdove commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467720671



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +569,39 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesWithExecutorParallel() {
+    Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<ThreeColumnRecord> records = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    for (int i = 0; i < 20; i++) {
+      df.write().mode("append").parquet(tableLocation + "/data");
+    }
+
+    Actions actions = Actions.forTable(table);
+
+    List<String> result = actions.removeOrphanFiles()
+        .olderThan(System.currentTimeMillis())
+        .executorParallelNum(10)
+        .execute();
+    Assert.assertEquals("Should delete only 20 files", 20, result.size());
+
+    // gets the number of tasks completed by the last stage
+    int completedTasks = spark.sparkContext().statusTracker()
+        .getStageInfo(23).get().numCompletedTasks();

Review comment:
       Is there a better way to get the last stage ID?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove edited a comment on pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
zhangdove edited a comment on pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#issuecomment-671698264


   @RussellSpitzer  Thanks for your review that has benefited me a lot.
   
   @aokolnychyi As you said, maybe I don't use it in a reasonable way.
   It wasn't until I saw this PR [#1244](https://github.com/apache/iceberg/pull/1244) that I thought of moving the file deletion method to this method, so that I later made some other attempts to move the file deletion from driver to Executor. Thank you for your Suggestions.
   
   I'm going to close this PR, and I'm going to keep watching and using master branch code


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove edited a comment on pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
zhangdove edited a comment on pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#issuecomment-671698264


   @RussellSpitzer  Thanks for your review that has benefited me a lot.
   
   @aokolnychyi As you said, maybe I don't use it in a reasonable way.
   It wasn't until I saw this PR [#1244](https://github.com/apache/iceberg/pull/1244) that I thought of moving the file deletion method to this method(`removeOrphanFiles()`), so that I later made some other attempts to move the file deletion from driver to Executor. Thank you for your Suggestions.
   
   I'm going to close this PR, and I'm going to keep watching and using master branch code


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#discussion_r467933277



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
##########
@@ -569,4 +569,39 @@ public void testRemoveOrphanFilesWithHadoopCatalog() throws InterruptedException
         .collectAsList();
     Assert.assertEquals("Rows must match", records, actualRecords);
   }
+
+  @Test
+  public void testRemoveOrphanFilesWithExecutorParallel() {
+    Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);
+
+    List<ThreeColumnRecord> records = Lists.newArrayList(
+        new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")
+    );
+
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);
+
+    df.select("c1", "c2", "c3")
+        .write()
+        .format("iceberg")
+        .mode("append")
+        .save(tableLocation);
+
+    for (int i = 0; i < 20; i++) {
+      df.write().mode("append").parquet(tableLocation + "/data");
+    }
+
+    Actions actions = Actions.forTable(table);
+
+    List<String> result = actions.removeOrphanFiles()
+        .olderThan(System.currentTimeMillis())
+        .executorParallelNum(10)
+        .execute();
+    Assert.assertEquals("Should delete only 20 files", 20, result.size());
+
+    // gets the number of tasks completed by the last stage
+    int completedTasks = spark.sparkContext().statusTracker()
+        .getStageInfo(23).get().numCompletedTasks();
+
+    Assert.assertEquals("Should executor only 10 tasks", 10, completedTasks);

Review comment:
       Message isn't clear here, "Executor should complete only 10 tasks"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangdove commented on pull request #1313: Move RemoveOrphanFilesAction run in Driver to Excutor

Posted by GitBox <gi...@apache.org>.
zhangdove commented on pull request #1313:
URL: https://github.com/apache/iceberg/pull/1313#issuecomment-671698264


   @RussellSpitzer  Thanks for your review that has benefited me a lot.
   
   @aokolnychyi As you said, maybe I don't use it in a reasonable way.
   It wasn't until I saw this PR that I thought of moving the file deletion method to this method, so that I later made some other attempts to move the file deletion from driver to Executor. Thank you for your Suggestions.
   
   I'm going to close this PR, and I'm going to keep watching and using master branch code


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org