You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/03/19 04:19:08 UTC

[beam] branch master updated: [BEAM-6443] decrease the number of thread for BigQuery streaming insertAll

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a6dcce7  [BEAM-6443] decrease the number of thread for BigQuery streaming insertAll
     new 425110a  Merge pull request #7547: [BEAM-6443] decrease the number of thread for BigQuery streaming inseā€¦
a6dcce7 is described below

commit a6dcce7b5e3eebc02aa584c4aa6a89d19837f10e
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Wed Jan 16 18:32:12 2019 -0800

    [BEAM-6443] decrease the number of thread for BigQuery streaming insertAll
    
    limit the number of threads used for insertAll by changing existing
    unlimited thread pool to bounded wrapper.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   8 ++
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 147 ++++++++++++++++++++-
 2 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index d6a80a7..48af213 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -45,4 +45,12 @@ public interface BigQueryOptions
   Integer getHTTPWriteTimeout();
 
   void setHTTPWriteTimeout(Integer timeout);
+
+  @Description(
+      "If specified, the given number of maximum concurrent threads will be used to insert "
+          + "rows from one bundle to BigQuery service with streaming insert API.")
+  @Default.Integer(3)
+  Integer getInsertBundleParallelism();
+
+  void setInsertBundleParallelism(Integer parallelism);
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index e1b1a7f..dbb33aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -58,11 +58,16 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -705,7 +710,10 @@ class BigQueryServicesImpl implements BigQueryServices {
         throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
-        this.executor = options.as(GcsOptions.class).getExecutorService();
+        this.executor =
+            new BoundedExecutorService(
+                options.as(GcsOptions.class).getExecutorService(),
+                options.as(BigQueryOptions.class).getInsertBundleParallelism());
       }
       if (insertIdList != null && rowList.size() != insertIdList.size()) {
         throw new AssertionError(
@@ -1001,4 +1009,141 @@ class BigQueryServicesImpl implements BigQueryServices {
       client.close();
     }
   }
+
+  private static class BoundedExecutorService implements ExecutorService {
+    private final ExecutorService executor;
+    private final Semaphore semaphore;
+    private final int parallelism;
+
+    BoundedExecutorService(ExecutorService executor, int parallelism) {
+      this.executor = executor;
+      this.parallelism = parallelism;
+      this.semaphore = new Semaphore(parallelism);
+    }
+
+    @Override
+    public void shutdown() {
+      executor.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      List<Runnable> runnables = executor.shutdownNow();
+      // try to release permits as many as possible before returning semaphored runnables.
+      synchronized (this) {
+        if (semaphore.availablePermits() <= parallelism) {
+          semaphore.release(Integer.MAX_VALUE - parallelism);
+        }
+      }
+      return runnables;
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return executor.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return executor.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
+      return executor.awaitTermination(l, timeUnit);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> callable) {
+      return executor.submit(new SemaphoreCallable<>(callable));
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable runnable, T t) {
+      return executor.submit(new SemaphoreRunnable(runnable), t);
+    }
+
+    @Override
+    public Future<?> submit(Runnable runnable) {
+      return executor.submit(new SemaphoreRunnable(runnable));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
+        throws InterruptedException {
+      return executor.invokeAll(
+          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+        Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
+        throws InterruptedException {
+      return executor.invokeAll(
+          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
+          l,
+          timeUnit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> collection)
+        throws InterruptedException, ExecutionException {
+      return executor.invokeAny(
+          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return executor.invokeAny(
+          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
+          l,
+          timeUnit);
+    }
+
+    @Override
+    public void execute(Runnable runnable) {
+      executor.execute(new SemaphoreRunnable(runnable));
+    }
+
+    private class SemaphoreRunnable implements Runnable {
+      private final Runnable runnable;
+
+      SemaphoreRunnable(Runnable runnable) {
+        this.runnable = runnable;
+      }
+
+      @Override
+      public void run() {
+        try {
+          semaphore.acquire();
+        } catch (InterruptedException e) {
+          throw new RuntimeException("semaphore acquisition interrupted. task canceled.");
+        }
+        try {
+          runnable.run();
+        } finally {
+          semaphore.release();
+        }
+      }
+    }
+
+    private class SemaphoreCallable<V> implements Callable<V> {
+      private final Callable<V> callable;
+
+      SemaphoreCallable(Callable<V> callable) {
+        this.callable = callable;
+      }
+
+      @Override
+      public V call() throws Exception {
+        semaphore.acquire();
+        try {
+          return callable.call();
+        } finally {
+          semaphore.release();
+        }
+      }
+    }
+  }
 }