You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/03/19 04:19:08 UTC
[beam] branch master updated: [BEAM-6443] decrease the number of
thread for BigQuery streaming insertAll
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a6dcce7 [BEAM-6443] decrease the number of thread for BigQuery streaming insertAll
new 425110a Merge pull request #7547: [BEAM-6443] decrease the number of thread for BigQuery streaming inseā¦
a6dcce7 is described below
commit a6dcce7b5e3eebc02aa584c4aa6a89d19837f10e
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Wed Jan 16 18:32:12 2019 -0800
[BEAM-6443] decrease the number of thread for BigQuery streaming insertAll
limit the number of threads used for insertAll by changing existing
unlimited thread pool to bounded wrapper.
---
.../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 8 ++
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 147 ++++++++++++++++++++-
2 files changed, 154 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index d6a80a7..48af213 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -45,4 +45,12 @@ public interface BigQueryOptions
Integer getHTTPWriteTimeout();
void setHTTPWriteTimeout(Integer timeout);
+
+ @Description(
+ "If specified, the given number of maximum concurrent threads will be used to insert "
+ + "rows from one bundle to BigQuery service with streaming insert API.")
+ @Default.Integer(3)
+ Integer getInsertBundleParallelism();
+
+ void setInsertBundleParallelism(Integer parallelism);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index e1b1a7f..dbb33aa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -58,11 +58,16 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -705,7 +710,10 @@ class BigQueryServicesImpl implements BigQueryServices {
throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
- this.executor = options.as(GcsOptions.class).getExecutorService();
+ this.executor =
+ new BoundedExecutorService(
+ options.as(GcsOptions.class).getExecutorService(),
+ options.as(BigQueryOptions.class).getInsertBundleParallelism());
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
throw new AssertionError(
@@ -1001,4 +1009,141 @@ class BigQueryServicesImpl implements BigQueryServices {
client.close();
}
}
+
+ private static class BoundedExecutorService implements ExecutorService {
+ private final ExecutorService executor;
+ private final Semaphore semaphore;
+ private final int parallelism;
+
+ BoundedExecutorService(ExecutorService executor, int parallelism) {
+ this.executor = executor;
+ this.parallelism = parallelism;
+ this.semaphore = new Semaphore(parallelism);
+ }
+
+ @Override
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ List<Runnable> runnables = executor.shutdownNow();
+ // try to release permits as many as possible before returning semaphored runnables.
+ synchronized (this) {
+ if (semaphore.availablePermits() <= parallelism) {
+ semaphore.release(Integer.MAX_VALUE - parallelism);
+ }
+ }
+ return runnables;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return executor.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return executor.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
+ return executor.awaitTermination(l, timeUnit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> callable) {
+ return executor.submit(new SemaphoreCallable<>(callable));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable runnable, T t) {
+ return executor.submit(new SemaphoreRunnable(runnable), t);
+ }
+
+ @Override
+ public Future<?> submit(Runnable runnable) {
+ return executor.submit(new SemaphoreRunnable(runnable));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
+ throws InterruptedException {
+ return executor.invokeAll(
+ collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
+ throws InterruptedException {
+ return executor.invokeAll(
+ collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
+ l,
+ timeUnit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> collection)
+ throws InterruptedException, ExecutionException {
+ return executor.invokeAny(
+ collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return executor.invokeAny(
+ collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
+ l,
+ timeUnit);
+ }
+
+ @Override
+ public void execute(Runnable runnable) {
+ executor.execute(new SemaphoreRunnable(runnable));
+ }
+
+ private class SemaphoreRunnable implements Runnable {
+ private final Runnable runnable;
+
+ SemaphoreRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("semaphore acquisition interrupted. task canceled.");
+ }
+ try {
+ runnable.run();
+ } finally {
+ semaphore.release();
+ }
+ }
+ }
+
+ private class SemaphoreCallable<V> implements Callable<V> {
+ private final Callable<V> callable;
+
+ SemaphoreCallable(Callable<V> callable) {
+ this.callable = callable;
+ }
+
+ @Override
+ public V call() throws Exception {
+ semaphore.acquire();
+ try {
+ return callable.call();
+ } finally {
+ semaphore.release();
+ }
+ }
+ }
+ }
}