You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/15 18:45:13 UTC
[beam] branch master updated: BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c3cc8e1ef7d BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
new 42b22ad191f Merge pull request #17333 from [BEAM-14283] BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
c3cc8e1ef7d is described below
commit c3cc8e1ef7d54eb801593bce2a97c71f5b4149e0
Author: Arun Pandian <pa...@google.com>
AuthorDate: Mon Apr 4 17:14:36 2022 -0700
BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
Moving the semaphore aquire to outside the runnable, the tasks now wait before
getting submitted to the executor and reuse existing threads.
---
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 156 +++------------------
1 file changed, 22 insertions(+), 134 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5259feb7b09..563f5f594ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -91,7 +91,6 @@ import io.grpc.Status.Code;
import io.grpc.protobuf.ProtoUtils;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -101,13 +100,11 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -131,6 +128,10 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -486,7 +487,7 @@ class BigQueryServicesImpl implements BigQueryServices {
private final Counter throttlingMsecs =
Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");
- private ExecutorService executor;
+ private BoundedExecutorService executor;
@VisibleForTesting
DatasetServiceImpl(
@@ -941,7 +942,7 @@ class BigQueryServicesImpl implements BigQueryServices {
if (executor == null) {
this.executor =
new BoundedExecutorService(
- options.as(GcsOptions.class).getExecutorService(),
+ MoreExecutors.listeningDecorator(options.as(GcsOptions.class).getExecutorService()),
options.as(BigQueryOptions.class).getInsertBundleParallelism());
}
if (insertIdList != null && rowList.size() != insertIdList.size()) {
@@ -1567,140 +1568,27 @@ class BigQueryServicesImpl implements BigQueryServices {
}
}
- private static class BoundedExecutorService implements ExecutorService {
- private final ExecutorService executor;
+ private static class BoundedExecutorService {
+ private final ListeningExecutorService taskExecutor;
+ private final ListeningExecutorService taskSubmitExecutor;
private final Semaphore semaphore;
- private final int parallelism;
- BoundedExecutorService(ExecutorService executor, int parallelism) {
- this.executor = executor;
- this.parallelism = parallelism;
+ BoundedExecutorService(ListeningExecutorService taskExecutor, int parallelism) {
+ this.taskExecutor = taskExecutor;
+ this.taskSubmitExecutor =
+ MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
this.semaphore = new Semaphore(parallelism);
}
- @Override
- public void shutdown() {
- executor.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- List<Runnable> runnables = executor.shutdownNow();
- // try to release permits as many as possible before returning semaphored runnables.
- synchronized (this) {
- if (semaphore.availablePermits() <= parallelism) {
- semaphore.release(Integer.MAX_VALUE - parallelism);
- }
- }
- return runnables;
- }
-
- @Override
- public boolean isShutdown() {
- return executor.isShutdown();
- }
-
- @Override
- public boolean isTerminated() {
- return executor.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
- return executor.awaitTermination(l, timeUnit);
- }
-
- @Override
public <T> Future<T> submit(Callable<T> callable) {
- return executor.submit(new SemaphoreCallable<>(callable));
- }
-
- @Override
- public <T> Future<T> submit(Runnable runnable, T t) {
- return executor.submit(new SemaphoreRunnable(runnable), t);
- }
-
- @Override
- public Future<?> submit(Runnable runnable) {
- return executor.submit(new SemaphoreRunnable(runnable));
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
- throws InterruptedException {
- return executor.invokeAll(
- collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(
- Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
- throws InterruptedException {
- return executor.invokeAll(
- collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
- l,
- timeUnit);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> collection)
- throws InterruptedException, ExecutionException {
- return executor.invokeAny(
- collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return executor.invokeAny(
- collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
- l,
- timeUnit);
- }
-
- @Override
- public void execute(Runnable runnable) {
- executor.execute(new SemaphoreRunnable(runnable));
- }
-
- private class SemaphoreRunnable implements Runnable {
- private final Runnable runnable;
-
- SemaphoreRunnable(Runnable runnable) {
- this.runnable = runnable;
- }
-
- @Override
- public void run() {
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- throw new RuntimeException("semaphore acquisition interrupted. task canceled.");
- }
- try {
- runnable.run();
- } finally {
- semaphore.release();
- }
- }
- }
-
- private class SemaphoreCallable<V> implements Callable<V> {
- private final Callable<V> callable;
-
- SemaphoreCallable(Callable<V> callable) {
- this.callable = callable;
- }
-
- @Override
- public V call() throws Exception {
- semaphore.acquire();
- try {
- return callable.call();
- } finally {
- semaphore.release();
- }
- }
+ return Futures.submitAsync(
+ () -> {
+ semaphore.acquire();
+ ListenableFuture<T> listenableFuture = taskExecutor.submit(callable);
+ listenableFuture.addListener(semaphore::release, MoreExecutors.directExecutor());
+ return listenableFuture;
+ },
+ taskSubmitExecutor);
}
}
}