You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/11 20:21:42 UTC

[GitHub] mccheah commented on a change in pull request #45: Lazily submit tasks in ParallelIterable and add cancellation.

mccheah commented on a change in pull request #45: Lazily submit tasks in ParallelIterable and add cancellation.
URL: https://github.com/apache/incubator-iceberg/pull/45#discussion_r240776853
 
 

 ##########
 File path: core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
 ##########
 @@ -19,73 +19,124 @@
 
 package com.netflix.iceberg.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.netflix.iceberg.io.CloseableGroup;
+import java.io.Closeable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-public class ParallelIterable<T> implements Iterable<T> {
+public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
   private final Iterable<Iterable<T>> iterables;
-  private final ExecutorService trackingPool;
   private final ExecutorService workerPool;
 
   public ParallelIterable(Iterable<Iterable<T>> iterables,
-                          ExecutorService trackingPool,
                           ExecutorService workerPool) {
     this.iterables = iterables;
-    this.trackingPool = trackingPool;
     this.workerPool = workerPool;
   }
 
   @Override
   public Iterator<T> iterator() {
-    return new ParallelIterator<>(iterables, trackingPool, workerPool);
+    ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+    addCloseable(iter);
+    return iter;
   }
 
-  private static class ParallelIterator<T> implements Iterator<T> {
+  private static class ParallelIterator<T> implements Iterator<T>, Closeable {
 
 Review comment:
   I'm wondering if there's a more idiomatic way to do this, particularly one that doesn't require both:
   1. Busy waiting. Generally it's a flag in concurrent programming if busy waiting is used over alternative primitives like locks, conditions, queues, monitors, etc.
   2. Manual tracking of tasks by index.
   
   I came up with the following alternative. Apologize that it has to be in pseudo-code form; due to the nature of the problem it's pretty hard to explain without the code. Let's see how this works out:
   
   ```
   class ParallelIterator<T> implements Iterator<T>, Closeable {
   
     private LinkedList<T> availableValues;
     private LinkedList<Future<List<T>>> runningTasks;
     private ExecutorService threadPool;
     private Iterator<Iterable<T>> pendingValues;
   
     // Constructor etc.
     
     boolean hasNext() {
       return !runningTasks.isEmpty() || !availableValues.isEmpty() || !pendingValues.isEmpty();
     }
   
     T next() {
       if (!availableValues.isEmpty()) {
         return availableValues.poll();
       }
       if (!runningTasks.isEmpty()) {
         availableValues.addAll(runningTasks.poll().get());
         return next(); // Or availableValues.poll() if we don't like recursion
       }
       if (pendingValues.hasNext()) {
         // Buffer / eagerly submit some set of tasks, i.e. lookahead.
         for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
           Iterable<T> nextPendingValues = pendingValues.next();
           Future<List<T>> nextRunningTask = threadPool.submit(() -> ImmutableList.copyOf(nextPendingValues));
           runningTasks.add(nextRunningTask);
         }
         return next(); // Recursive call that checks will now check based on running tasks
       }
       throw error; // No values remaining
     }
   }
   
   ```
   
   The general idea is to keep a running iterator over the backing iterable. When calling `next()`, submit tasks that are buffered into a worker queue of futures; each future represents computing the next group of values. Then on `next()`:
   - Get an available value from a completed task, if possible
   - Else check the work queue and see if a new batch of values is ready
   - Otherwise submit more work and wait
   
   What do you think about this approach? The advantages are:
   - No busy waiting
   - No need to maintain indices manually. Everything is done via collection primitives (`poll`, `iterator`, etc.)
   
   There's a few ways this framework can be adjusted. For example on `next`, if we determine that there is only some minimum number of running tasks remaining, we can choose to eagerly submit work ahead of the user actually requesting for those values - thereby we pipeline the main thread's work on the values with the worker thread's work that produces the values.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Re: [GitHub] mccheah commented on a change in pull request #45: Lazily submit tasks in ParallelIterable and add cancellation.

Posted by Greg Roodt <gr...@gmail.com>.
How to I unsubscribe to the GitBox emails? I've tried to unsubscribe to the
apache list by sending an email to dev-unsubscribe@iceberg.apache.org

It's been more than 24h, so I'm not sure if I need to do something else.

Thanks
Greg


On Wed, 12 Dec 2018 at 07:21, GitBox <gi...@apache.org> wrote:

> mccheah commented on a change in pull request #45: Lazily submit tasks in
> ParallelIterable and add cancellation.
> URL:
> https://github.com/apache/incubator-iceberg/pull/45#discussion_r240776853
>
>
>
>  ##########
>  File path:
> core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
>  ##########
>  @@ -19,73 +19,124 @@
>
>  package com.netflix.iceberg.util;
>
> +import com.google.common.base.Preconditions;
> +import com.google.common.collect.Iterables;
> +import com.netflix.iceberg.io.CloseableGroup;
> +import java.io.Closeable;
>  import java.util.Iterator;
>  import java.util.NoSuchElementException;
>  import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.ExecutionException;
>  import java.util.concurrent.ExecutorService;
>  import java.util.concurrent.Future;
> -import java.util.concurrent.TimeUnit;
> -import java.util.concurrent.TimeoutException;
>
> -public class ParallelIterable<T> implements Iterable<T> {
> +public class ParallelIterable<T> extends CloseableGroup implements
> Iterable<T> {
>    private final Iterable<Iterable<T>> iterables;
> -  private final ExecutorService trackingPool;
>    private final ExecutorService workerPool;
>
>    public ParallelIterable(Iterable<Iterable<T>> iterables,
> -                          ExecutorService trackingPool,
>                            ExecutorService workerPool) {
>      this.iterables = iterables;
> -    this.trackingPool = trackingPool;
>      this.workerPool = workerPool;
>    }
>
>    @Override
>    public Iterator<T> iterator() {
> -    return new ParallelIterator<>(iterables, trackingPool, workerPool);
> +    ParallelIterator<T> iter = new ParallelIterator<>(iterables,
> workerPool);
> +    addCloseable(iter);
> +    return iter;
>    }
>
> -  private static class ParallelIterator<T> implements Iterator<T> {
> +  private static class ParallelIterator<T> implements Iterator<T>,
> Closeable {
>
>  Review comment:
>    I'm wondering if there's a more idiomatic way to do this, particularly
> one that doesn't require both:
>    1. Busy waiting. Generally it's a flag in concurrent programming if
> busy waiting is used over alternative primitives like locks, conditions,
> queues, monitors, etc.
>    2. Manual tracking of tasks by index.
>
>    I came up with the following alternative. Apologize that it has to be
> in pseudo-code form; due to the nature of the problem it's pretty hard to
> explain without the code. Let's see how this works out:
>
>    ```
>    class ParallelIterator<T> implements Iterator<T>, Closeable {
>
>      private LinkedList<T> availableValues;
>      private LinkedList<Future<List<T>>> runningTasks;
>      private ExecutorService threadPool;
>      private Iterator<Iterable<T>> pendingValues;
>
>      // Constructor etc.
>
>      boolean hasNext() {
>        return !runningTasks.isEmpty() || !availableValues.isEmpty() ||
> !pendingValues.isEmpty();
>      }
>
>      T next() {
>        if (!availableValues.isEmpty()) {
>          return availableValues.poll();
>        }
>        if (!runningTasks.isEmpty()) {
>          availableValues.addAll(runningTasks.poll().get());
>          return next(); // Or availableValues.poll() if we don't like
> recursion
>        }
>        if (pendingValues.hasNext()) {
>          // Buffer / eagerly submit some set of tasks, i.e. lookahead.
>          for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
>            Iterable<T> nextPendingValues = pendingValues.next();
>            Future<List<T>> nextRunningTask = threadPool.submit(() ->
> ImmutableList.copyOf(nextPendingValues));
>            runningTasks.add(nextRunningTask);
>          }
>          return next(); // Recursive call that checks will now check based
> on running tasks
>        }
>        throw error; // No values remaining
>      }
>    }
>
>    ```
>
>    The general idea is to keep a running iterator over the backing
> iterable. When calling `next()`, submit tasks that are buffered into a
> worker queue of futures; each future represents computing the next group of
> values. Then on `next()`:
>    - Get an available value from a completed task, if possible
>    - Else check the work queue and see if a new batch of values is ready
>    - Otherwise submit more work and wait
>
>    What do you think about this approach? The advantages are:
>    - No busy waiting
>    - No need to maintain indices manually. Everything is done via
> collection primitives (`poll`, `iterator`, etc.)
>
>    There's a few ways this framework can be adjusted. For example on
> `next`, if we determine that there is only some minimum number of running
> tasks remaining, we can choose to eagerly submit work ahead of the user
> actually requesting for those values - thereby we pipeline the main
> thread's work on the values with the worker thread's work that produces the
> values.
>
> ----------------------------------------------------------------
> This is an automated message from the Apache Git Service.
> To respond to the message, please log on GitHub and use the
> URL above to go to the specific comment.
>
> For queries about this service, please contact Infrastructure at:
> users@infra.apache.org
>
>
> With regards,
> Apache Git Services
>