You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/12 05:26:51 UTC

[GitHub] rdsr commented on a change in pull request #45: Lazily submit tasks in ParallelIterable and add cancellation.

rdsr commented on a change in pull request #45: Lazily submit tasks in ParallelIterable and add cancellation.
URL: https://github.com/apache/incubator-iceberg/pull/45#discussion_r240885973
 
 

 ##########
 File path: core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
 ##########
 @@ -19,73 +19,124 @@
 
 package com.netflix.iceberg.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.netflix.iceberg.io.CloseableGroup;
+import java.io.Closeable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-public class ParallelIterable<T> implements Iterable<T> {
+public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
   private final Iterable<Iterable<T>> iterables;
-  private final ExecutorService trackingPool;
   private final ExecutorService workerPool;
 
   public ParallelIterable(Iterable<Iterable<T>> iterables,
-                          ExecutorService trackingPool,
                           ExecutorService workerPool) {
     this.iterables = iterables;
-    this.trackingPool = trackingPool;
     this.workerPool = workerPool;
   }
 
   @Override
   public Iterator<T> iterator() {
-    return new ParallelIterator<>(iterables, trackingPool, workerPool);
+    ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+    addCloseable(iter);
+    return iter;
   }
 
-  private static class ParallelIterator<T> implements Iterator<T> {
+  private static class ParallelIterator<T> implements Iterator<T>, Closeable {
+    private final Iterator<Runnable> tasks;
+    private final ExecutorService workerPool;
+    private final Future<?>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
-    private final Future<?> taskFuture;
-
-    public ParallelIterator(Iterable<Iterable<T>> iterables,
-                            ExecutorService trackingPool,
-                            ExecutorService workerPool) {
-      this.taskFuture = trackingPool.submit(() -> {
-        Tasks.foreach(iterables)
-            .noRetry().stopOnFailure().throwFailureWhenFinished()
-            .executeWith(workerPool)
-            .run(iterable -> {
-              for (T item : iterable) {
-                queue.add(item);
-              }
-            });
-        return true;
-      });
+    private boolean closed = false;
+
+    private ParallelIterator(Iterable<Iterable<T>> iterables,
+                             ExecutorService workerPool) {
+      this.tasks = Iterables.transform(iterables, iterable ->
+          (Runnable) () -> {
+            for (T item : iterable) {
+              queue.add(item);
+            }
+          }).iterator();
+      this.workerPool = workerPool;
+      // submit 2 tasks per worker at a time
+      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+    }
+
+    @Override
+    public void close() {
+      // cancel background tasks
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] != null && !taskFutures[i].isDone()) {
+          taskFutures[i].cancel(true);
+        }
+      }
+      this.closed = true;
+    }
+
+    /**
+     * Checks on running tasks and submits new tasks if needed.
+     * <p>
+     * This should not be called after {@link #close()}.
+     *
+     * @return true if there are pending tasks, false otherwise
+     */
+    private boolean checkTasks() {
+      boolean hasRunningTask = false;
+
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] == null || taskFutures[i].isDone()) {
+          taskFutures[i] = submitNextTask();
+        }
+
+        if (taskFutures[i] != null) {
+          hasRunningTask = true;
+        }
+      }
+
+      return tasks.hasNext() || hasRunningTask;
+    }
+
+    private Future<?> submitNextTask() {
+      if (tasks.hasNext()) {
+        return workerPool.submit(tasks.next());
+      }
+      return null;
     }
 
     @Override
     public synchronized boolean hasNext() {
+      Preconditions.checkState(!closed, "Already closed");
+
+      // if the consumer is processing records more slowly than the producers, then this check will
+      // prevent tasks from being submitted. while the producers are running, this will always
+      // return here before running checkTasks. when enough of the tasks are finished that the
+      // consumer catches up, then lots of new tasks will be submitted at once. this behavior is
+      // okay because it ensures that records are not stacking up waiting to be consumed and taking
+      // up memory.
+      //
+      // consumers that process results quickly will periodically exhaust the queue and submit new
+      // tasks when checkTasks runs. fast consumers should not be delayed.
+      if (!queue.isEmpty()) {
+        return true;
+      }
+
       // this cannot conclude that there are no more records until tasks have finished. while some
       // are running, return true when there is at least one item to return.
-      while (!taskFuture.isDone()) {
+      while (checkTasks()) {
 
 Review comment:
   Not sure I completely follow the logic here
   
   When `checkTasks()` returns true, we know for sure that some datafiles will be added to the queue (since either we submitted new tasks or there are already tasks running). 
   
   I wonder if a slightly different approach, which could be slightly more lazier, will help here. Can we block on the queue until its size > 0 instead of the while loop on `checkTasks()`?  In this way, we do not keep on scheduling more tasks until our queue gets some datafiles. Seems like this may be more in line with the general idea of the patch  [lazily parsing manifests and computing datafiles] as it would only submit necessary number of tasks.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services