You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/09 10:57:41 UTC

[GitHub] [iceberg] fbocse opened a new pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

fbocse opened a new pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187


   This PR aims to support https://github.com/apache/iceberg/issues/1178
   
   By default execution of delete tasks of metadata and data files is single threaded.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r453929734



##########
File path: api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
##########
@@ -82,4 +83,18 @@
    * @return this for method chaining
    */
   ExpireSnapshots deleteWith(Consumer<String> deleteFunc);
+
+  /**
+   * Passes an alternative executor service that will be used for manifests and data files deletion.
+   * <p>
+   * Manifest files that are no longer used by valid snapshots will be deleted. Data files that were
+   * deleted by snapshots that are expired will be deleted.
+   * <p>
+   * If this method is not called, unnecessary manifests and data files will still be deleted using a single threaded
+   * executor service.
+   *
+   * @param executorService an executor service to parallelize tasks to delete manifests and data files
+   * @return this for method chaining
+   */
+  ExpireSnapshots deleteWith(ExecutorService executorService);

Review comment:
       Because there is already a `deleteWith` method that controls an option that is substantially different, I don't think it is a good idea to name the `deleteWith`. It seems odd to pass a callback to one and an executor service to another. What about naming this `executeWith` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454271732



##########
File path: core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
##########
@@ -404,6 +405,59 @@ public void dataFilesCleanup() throws IOException {
     Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
   }
 
+  @Test
+  public void dataFilesCleanupWithParallelTasks() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D))
+        .commit();
+    long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C))
+        .commit();
+    long fourthSnapshotId = table.currentSnapshot().snapshotId();
+
+    long t4 = System.currentTimeMillis();
+    while (t4 <= table.currentSnapshot().timestampMillis()) {
+      t4 = System.currentTimeMillis();
+    }
+
+    List<ManifestFile> manifests = table.currentSnapshot().dataManifests();
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C),
+        manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    manifests.forEach(rewriteManifests::deleteManifest);
+    rewriteManifests.addManifest(newManifest);
+    rewriteManifests.commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+
+    table.expireSnapshots()
+        .deleteWith(Executors.newFixedThreadPool(8, runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setDaemon(true);
+          return thread;
+        }))
+        .expireOlderThan(t4)
+        .deleteWith(deletedFiles::add)

Review comment:
       You're right, it doesn't verify that the provided executor services was used to submit the tasks.
   I was looking through the test suites for a pattern on unit testing usage of executor service for tasks parallel execution, didn't find one though.
   I'll follow up on your proposal to making this test more relevant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454257193



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,19 @@ public void accept(String file) {
     }
   };
 
+  private final ExecutorService defaultDeleteExecutorService = Executors.newSingleThreadExecutor(runnable -> {

Review comment:
       Using `MoreExecutors.newDirectExecutorService()` to keep it consistent with the expected type `java.util.concurrent.ExecutorService` to pass to `org.apache.iceberg.util.Tasks.Builder#executeWith`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454496692



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,16 @@ public void accept(String file) {
     }
   };
 
+  // Creates an executor service that runs each task in the thread that invokes execute/submit.
+  private static final ExecutorService defaultDeleteExecutorService = MoreExecutors.newDirectExecutorService();

Review comment:
       And can you move this near the logger instead of here? That will keep all the static final fields together.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454496153



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,16 @@ public void accept(String file) {
     }
   };
 
+  // Creates an executor service that runs each task in the thread that invokes execute/submit.
+  private static final ExecutorService defaultDeleteExecutorService = MoreExecutors.newDirectExecutorService();

Review comment:
       Nit: Static final names should be `ALL_CAPS`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r453931466



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,19 @@ public void accept(String file) {
     }
   };
 
+  private final ExecutorService defaultDeleteExecutorService = Executors.newSingleThreadExecutor(runnable -> {

Review comment:
       I think the default should be the current behavior, which is to run immediately in the current thread. You can get that behavior by using [`MoreExecutors.directExecutor()`](https://guava.dev/releases/19.0/api/docs/com/google/common/util/concurrent/MoreExecutors.html#directExecutor()) as the default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454271732



##########
File path: core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
##########
@@ -404,6 +405,59 @@ public void dataFilesCleanup() throws IOException {
     Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
   }
 
+  @Test
+  public void dataFilesCleanupWithParallelTasks() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D))
+        .commit();
+    long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C))
+        .commit();
+    long fourthSnapshotId = table.currentSnapshot().snapshotId();
+
+    long t4 = System.currentTimeMillis();
+    while (t4 <= table.currentSnapshot().timestampMillis()) {
+      t4 = System.currentTimeMillis();
+    }
+
+    List<ManifestFile> manifests = table.currentSnapshot().dataManifests();
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C),
+        manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    manifests.forEach(rewriteManifests::deleteManifest);
+    rewriteManifests.addManifest(newManifest);
+    rewriteManifests.commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+
+    table.expireSnapshots()
+        .deleteWith(Executors.newFixedThreadPool(8, runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setDaemon(true);
+          return thread;
+        }))
+        .expireOlderThan(t4)
+        .deleteWith(deletedFiles::add)

Review comment:
       You're right, it doesn't verify that the provided executor services was used to submit the tasks.
   I was looking through the test suites for a pattern on unit testing usage of executor service for tasks parallel execution, didn't find one though.
   I'll follow up on your proposal to making this code testable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r453933833



##########
File path: core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
##########
@@ -404,6 +405,59 @@ public void dataFilesCleanup() throws IOException {
     Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
   }
 
+  @Test
+  public void dataFilesCleanupWithParallelTasks() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D))
+        .commit();
+    long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C))
+        .commit();
+    long fourthSnapshotId = table.currentSnapshot().snapshotId();
+
+    long t4 = System.currentTimeMillis();
+    while (t4 <= table.currentSnapshot().timestampMillis()) {
+      t4 = System.currentTimeMillis();
+    }
+
+    List<ManifestFile> manifests = table.currentSnapshot().dataManifests();
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C),
+        manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    manifests.forEach(rewriteManifests::deleteManifest);
+    rewriteManifests.addManifest(newManifest);
+    rewriteManifests.commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+
+    table.expireSnapshots()
+        .deleteWith(Executors.newFixedThreadPool(8, runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setDaemon(true);
+          return thread;
+        }))
+        .expireOlderThan(t4)
+        .deleteWith(deletedFiles::add)

Review comment:
       This doesn't actually test that the thread pool is used, does it? What adding the thread ID to a concurrent set in `deleteWith`? Then you could assert that the threads you created were actually used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r453931824



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,19 @@ public void accept(String file) {
     }
   };
 
+  private final ExecutorService defaultDeleteExecutorService = Executors.newSingleThreadExecutor(runnable -> {

Review comment:
       If you use `directExecutor()` then this can be `static`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#issuecomment-658291544


   Overall looks good. +1
   
   I'd like to fix the minor issues before merging, though.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454497482



##########
File path: core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
##########
@@ -404,6 +407,69 @@ public void dataFilesCleanup() throws IOException {
     Assert.assertTrue("FILE_B should be deleted", deletedFiles.contains(FILE_B.path().toString()));
   }
 
+  @Test
+  public void dataFilesCleanupWithParallelTasks() throws IOException {
+    table.newFastAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    table.newFastAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_B), ImmutableSet.of(FILE_D))
+        .commit();
+    long thirdSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.newRewrite()
+        .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_C))
+        .commit();
+    long fourthSnapshotId = table.currentSnapshot().snapshotId();
+
+    long t4 = System.currentTimeMillis();
+    while (t4 <= table.currentSnapshot().timestampMillis()) {
+      t4 = System.currentTimeMillis();
+    }
+
+    List<ManifestFile> manifests = table.currentSnapshot().dataManifests();
+
+    ManifestFile newManifest = writeManifest(
+        "manifest-file-1.avro",
+        manifestEntry(Status.EXISTING, thirdSnapshotId, FILE_C),
+        manifestEntry(Status.EXISTING, fourthSnapshotId, FILE_D));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+    manifests.forEach(rewriteManifests::deleteManifest);
+    rewriteManifests.addManifest(newManifest);
+    rewriteManifests.commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    Set<String> deleteThreads = ConcurrentHashMap.newKeySet();
+    AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
+
+    table.expireSnapshots()
+        .executeWith(Executors.newFixedThreadPool(4, runnable -> {
+          Thread thread = new Thread(runnable);
+          thread.setName("remove-snapshot-" + deleteThreadsIndex.getAndIncrement());
+          thread.setDaemon(true); // daemon threads will be terminated abruptly when the JVM exits
+          return thread;
+        }))
+        .expireOlderThan(t4)
+        .deleteWith(s -> {
+          deleteThreads.add(Thread.currentThread().getName());
+          deletedFiles.add(s);
+        })
+        .commit();
+
+    // Verifies the

Review comment:
       Unfinished comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] fbocse commented on a change in pull request #1187: Add support to parallelise the execution of tasks of metadata and data files delete operations

Posted by GitBox <gi...@apache.org>.
fbocse commented on a change in pull request #1187:
URL: https://github.com/apache/iceberg/pull/1187#discussion_r454848899



##########
File path: core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
##########
@@ -60,12 +62,16 @@ public void accept(String file) {
     }
   };
 
+  // Creates an executor service that runs each task in the thread that invokes execute/submit.
+  private static final ExecutorService defaultDeleteExecutorService = MoreExecutors.newDirectExecutorService();

Review comment:
       Right, didn't run the checks, I'll run the gradle checks before submitting the changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org