You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/01/16 16:29:56 UTC

[incubator-iceberg] branch master updated: Lazily submit tasks in ParallelIterable and add cancellation. (#45)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dc3592  Lazily submit tasks in ParallelIterable and add cancellation. (#45)
0dc3592 is described below

commit 0dc35923a7bdadc4946e7b6fb85dd680844c1362
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Jan 16 08:29:52 2019 -0800

    Lazily submit tasks in ParallelIterable and add cancellation. (#45)
    
    * Lazily submit tasks in ParallelIterable and add cancellation.
    
    This removes the planner pool from ParallelIterable, which was used to
    submit all of the iterable's tasks in parallel. This was used to queue
    up tasks to read every manifest in a snapshot. However, when a caller
    stopped reading early, all tasks would still run and add results to the
    queue.
    
    Now, tasks are submitted from the thread consuming the iterator as it
    runs hasNext. If the caller stops consuming the iterator, then no new
    tasks are submitted. This also keeps track of the submitted tasks and
    will cancel them when the iterator is closed.
    
    * Remove SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP.
---
 .../com/netflix/iceberg/io/CloseableGroup.java     |   3 +
 .../java/com/netflix/iceberg/BaseTableScan.java    |   3 +-
 .../java/com/netflix/iceberg/SystemProperties.java |   6 --
 .../com/netflix/iceberg/util/ParallelIterable.java | 116 +++++++++++++++------
 .../java/com/netflix/iceberg/util/ThreadPools.java |  29 +-----
 .../netflix/iceberg/hadoop/TestHadoopCommits.java  |   2 +-
 6 files changed, 92 insertions(+), 67 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/io/CloseableGroup.java b/api/src/main/java/com/netflix/iceberg/io/CloseableGroup.java
index 1f62246..a3dc85c 100644
--- a/api/src/main/java/com/netflix/iceberg/io/CloseableGroup.java
+++ b/api/src/main/java/com/netflix/iceberg/io/CloseableGroup.java
@@ -47,6 +47,9 @@ public abstract class CloseableGroup implements Closeable {
 
     public ClosingIterable(Iterable<T> iterable, Iterable<Closeable> closeables) {
       this.iterable = iterable;
+      if (iterable instanceof Closeable) {
+        addCloseable((Closeable) iterable);
+      }
       for (Closeable closeable : closeables) {
         addCloseable(closeable);
       }
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index 8915461..bbfdc4f 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static com.netflix.iceberg.util.ThreadPools.getPlannerPool;
 import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 
 /**
@@ -189,7 +188,7 @@ class BaseTableScan implements TableScan {
 
       if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
         return CloseableIterable.combine(
-            new ParallelIterable<>(readers, getPlannerPool(), getWorkerPool()),
+            new ParallelIterable<>(readers, getWorkerPool()),
             toClose);
       } else {
         return CloseableIterable.combine(Iterables.concat(readers), toClose);
diff --git a/core/src/main/java/com/netflix/iceberg/SystemProperties.java b/core/src/main/java/com/netflix/iceberg/SystemProperties.java
index a1ed7c4..8f2652b 100644
--- a/core/src/main/java/com/netflix/iceberg/SystemProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/SystemProperties.java
@@ -24,12 +24,6 @@ package com.netflix.iceberg;
  */
 public class SystemProperties {
   /**
-   * Sets the size of the planner pool. The planner pool limits the number of concurrent planning
-   * operations in the base table implementation.
-   */
-  public static final String PLANNER_THREAD_POOL_SIZE_PROP = "iceberg.planner.num-threads";
-
-  /**
    * Sets the size of the worker pool. The worker pool limits the number of tasks concurrently
    * processing manifests in the base table implementation across all concurrent planning or commit
    * operations.
diff --git a/core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java b/core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
index b04221c..f2c7d3f 100644
--- a/core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
@@ -19,73 +19,124 @@
 
 package com.netflix.iceberg.util;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.netflix.iceberg.io.CloseableGroup;
+import java.io.Closeable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-public class ParallelIterable<T> implements Iterable<T> {
+public class ParallelIterable<T> extends CloseableGroup implements Iterable<T> {
   private final Iterable<Iterable<T>> iterables;
-  private final ExecutorService trackingPool;
   private final ExecutorService workerPool;
 
   public ParallelIterable(Iterable<Iterable<T>> iterables,
-                          ExecutorService trackingPool,
                           ExecutorService workerPool) {
     this.iterables = iterables;
-    this.trackingPool = trackingPool;
     this.workerPool = workerPool;
   }
 
   @Override
   public Iterator<T> iterator() {
-    return new ParallelIterator<>(iterables, trackingPool, workerPool);
+    ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+    addCloseable(iter);
+    return iter;
   }
 
-  private static class ParallelIterator<T> implements Iterator<T> {
+  private static class ParallelIterator<T> implements Iterator<T>, Closeable {
+    private final Iterator<Runnable> tasks;
+    private final ExecutorService workerPool;
+    private final Future<?>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
-    private final Future<?> taskFuture;
-
-    public ParallelIterator(Iterable<Iterable<T>> iterables,
-                            ExecutorService trackingPool,
-                            ExecutorService workerPool) {
-      this.taskFuture = trackingPool.submit(() -> {
-        Tasks.foreach(iterables)
-            .noRetry().stopOnFailure().throwFailureWhenFinished()
-            .executeWith(workerPool)
-            .run(iterable -> {
-              for (T item : iterable) {
-                queue.add(item);
-              }
-            });
-        return true;
-      });
+    private boolean closed = false;
+
+    private ParallelIterator(Iterable<Iterable<T>> iterables,
+                             ExecutorService workerPool) {
+      this.tasks = Iterables.transform(iterables, iterable ->
+          (Runnable) () -> {
+            for (T item : iterable) {
+              queue.add(item);
+            }
+          }).iterator();
+      this.workerPool = workerPool;
+      // submit 2 tasks per worker at a time
+      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+    }
+
+    @Override
+    public void close() {
+      // cancel background tasks
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] != null && !taskFutures[i].isDone()) {
+          taskFutures[i].cancel(true);
+        }
+      }
+      this.closed = true;
+    }
+
+    /**
+     * Checks on running tasks and submits new tasks if needed.
+     * <p>
+     * This should not be called after {@link #close()}.
+     *
+     * @return true if there are pending tasks, false otherwise
+     */
+    private boolean checkTasks() {
+      boolean hasRunningTask = false;
+
+      for (int i = 0; i < taskFutures.length; i += 1) {
+        if (taskFutures[i] == null || taskFutures[i].isDone()) {
+          taskFutures[i] = submitNextTask();
+        }
+
+        if (taskFutures[i] != null) {
+          hasRunningTask = true;
+        }
+      }
+
+      return tasks.hasNext() || hasRunningTask;
+    }
+
+    private Future<?> submitNextTask() {
+      if (tasks.hasNext()) {
+        return workerPool.submit(tasks.next());
+      }
+      return null;
     }
 
     @Override
     public synchronized boolean hasNext() {
+      Preconditions.checkState(!closed, "Already closed");
+
+      // if the consumer is processing records more slowly than the producers, then this check will
+      // prevent tasks from being submitted. while the producers are running, this will always
+      // return here before running checkTasks. when enough of the tasks are finished that the
+      // consumer catches up, then lots of new tasks will be submitted at once. this behavior is
+      // okay because it ensures that records are not stacking up waiting to be consumed and taking
+      // up memory.
+      //
+      // consumers that process results quickly will periodically exhaust the queue and submit new
+      // tasks when checkTasks runs. fast consumers should not be delayed.
+      if (!queue.isEmpty()) {
+        return true;
+      }
+
       // this cannot conclude that there are no more records until tasks have finished. while some
       // are running, return true when there is at least one item to return.
-      while (!taskFuture.isDone()) {
+      while (checkTasks()) {
         if (!queue.isEmpty()) {
           return true;
         }
 
         try {
-          taskFuture.get(10, TimeUnit.MILLISECONDS);
-          break;
+          Thread.sleep(10);
 
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          ExceptionUtil.castAndThrow(e.getCause(), RuntimeException.class);
-        } catch (TimeoutException e) {
-          // continue looping to check the queue size and wait again
         }
       }
 
@@ -102,5 +153,4 @@ public class ParallelIterable<T> implements Iterable<T> {
       return queue.poll();
     }
   }
-
 }
diff --git a/core/src/main/java/com/netflix/iceberg/util/ThreadPools.java b/core/src/main/java/com/netflix/iceberg/util/ThreadPools.java
index e2d74db..cc977d3 100644
--- a/core/src/main/java/com/netflix/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/com/netflix/iceberg/util/ThreadPools.java
@@ -27,43 +27,22 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
 public class ThreadPools {
-  public static final String PLANNER_THREAD_POOL_SIZE_PROP =
-      SystemProperties.PLANNER_THREAD_POOL_SIZE_PROP;
   public static final String WORKER_THREAD_POOL_SIZE_PROP =
       SystemProperties.WORKER_THREAD_POOL_SIZE_PROP;
 
-  private static ExecutorService PLANNER_POOL = MoreExecutors.getExitingExecutorService(
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(
-          getPoolSize(PLANNER_THREAD_POOL_SIZE_PROP, 4),
-          new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setNameFormat("iceberg-planner-pool-%d")
-              .build()));
+  public static final int WORKER_THREAD_POOL_SIZE = getPoolSize(
+      WORKER_THREAD_POOL_SIZE_PROP,
+      Runtime.getRuntime().availableProcessors());
 
   private static ExecutorService WORKER_POOL = MoreExecutors.getExitingExecutorService(
       (ThreadPoolExecutor) Executors.newFixedThreadPool(
-          getPoolSize(WORKER_THREAD_POOL_SIZE_PROP, Runtime.getRuntime().availableProcessors()),
+          WORKER_THREAD_POOL_SIZE,
           new ThreadFactoryBuilder()
               .setDaemon(true)
               .setNameFormat("iceberg-worker-pool-%d")
               .build()));
 
   /**
-   * Return an {@link ExecutorService} that uses the "planner" thread-pool.
-   * <p>
-   * The size of the planner pool limits the number of concurrent planning operations in the base
-   * table implementation.
-   * <p>
-   * The size of this thread-pool is controlled by the Java system property
-   * {@code iceberg.planner.num-threads}.
-   *
-   * @return an {@link ExecutorService} that uses the planner pool
-   */
-  public static ExecutorService getPlannerPool() {
-    return PLANNER_POOL;
-  }
-
-  /**
    * Return an {@link ExecutorService} that uses the "worker" thread-pool.
    * <p>
    * The size of the worker pool limits the number of tasks concurrently reading manifests in the
diff --git a/core/src/test/java/com/netflix/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/com/netflix/iceberg/hadoop/TestHadoopCommits.java
index 0ee0210..8092320 100644
--- a/core/src/test/java/com/netflix/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/com/netflix/iceberg/hadoop/TestHadoopCommits.java
@@ -55,7 +55,7 @@ public class TestHadoopCommits extends HadoopTableTestBase {
         metadataDir.exists() && metadataDir.isDirectory());
     Assert.assertTrue("Should create v1 metadata",
         version(1).exists() && version(1).isFile());
-    Assert.assertFalse("Should not create v2 or newer verions",
+    Assert.assertFalse("Should not create v2 or newer versions",
         version(2).exists());
     Assert.assertTrue("Should create version hint file",
         versionHintFile.exists());