You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/15 18:45:13 UTC

[beam] branch master updated: BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c3cc8e1ef7d BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
     new 42b22ad191f Merge pull request #17333 from [BEAM-14283] BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
c3cc8e1ef7d is described below

commit c3cc8e1ef7d54eb801593bce2a97c71f5b4149e0
Author: Arun Pandian <pa...@google.com>
AuthorDate: Mon Apr 4 17:14:36 2022 -0700

    BigQueryServicesImpl: reduce number of threads spawned by BoundedExecutorService
    
    Moving the semaphore aquire to outside the runnable, the tasks now wait before
    getting submitted to the executor and reuse existing threads.
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 156 +++------------------
 1 file changed, 22 insertions(+), 134 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 5259feb7b09..563f5f594ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -91,7 +91,6 @@ import io.grpc.Status.Code;
 import io.grpc.protobuf.ProtoUtils;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -101,13 +100,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -131,6 +128,10 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditio
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -486,7 +487,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     private final Counter throttlingMsecs =
         Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");
 
-    private ExecutorService executor;
+    private BoundedExecutorService executor;
 
     @VisibleForTesting
     DatasetServiceImpl(
@@ -941,7 +942,7 @@ class BigQueryServicesImpl implements BigQueryServices {
       if (executor == null) {
         this.executor =
             new BoundedExecutorService(
-                options.as(GcsOptions.class).getExecutorService(),
+                MoreExecutors.listeningDecorator(options.as(GcsOptions.class).getExecutorService()),
                 options.as(BigQueryOptions.class).getInsertBundleParallelism());
       }
       if (insertIdList != null && rowList.size() != insertIdList.size()) {
@@ -1567,140 +1568,27 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
   }
 
-  private static class BoundedExecutorService implements ExecutorService {
-    private final ExecutorService executor;
+  private static class BoundedExecutorService {
+    private final ListeningExecutorService taskExecutor;
+    private final ListeningExecutorService taskSubmitExecutor;
     private final Semaphore semaphore;
-    private final int parallelism;
 
-    BoundedExecutorService(ExecutorService executor, int parallelism) {
-      this.executor = executor;
-      this.parallelism = parallelism;
+    BoundedExecutorService(ListeningExecutorService taskExecutor, int parallelism) {
+      this.taskExecutor = taskExecutor;
+      this.taskSubmitExecutor =
+          MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
       this.semaphore = new Semaphore(parallelism);
     }
 
-    @Override
-    public void shutdown() {
-      executor.shutdown();
-    }
-
-    @Override
-    public List<Runnable> shutdownNow() {
-      List<Runnable> runnables = executor.shutdownNow();
-      // try to release permits as many as possible before returning semaphored runnables.
-      synchronized (this) {
-        if (semaphore.availablePermits() <= parallelism) {
-          semaphore.release(Integer.MAX_VALUE - parallelism);
-        }
-      }
-      return runnables;
-    }
-
-    @Override
-    public boolean isShutdown() {
-      return executor.isShutdown();
-    }
-
-    @Override
-    public boolean isTerminated() {
-      return executor.isTerminated();
-    }
-
-    @Override
-    public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
-      return executor.awaitTermination(l, timeUnit);
-    }
-
-    @Override
     public <T> Future<T> submit(Callable<T> callable) {
-      return executor.submit(new SemaphoreCallable<>(callable));
-    }
-
-    @Override
-    public <T> Future<T> submit(Runnable runnable, T t) {
-      return executor.submit(new SemaphoreRunnable(runnable), t);
-    }
-
-    @Override
-    public Future<?> submit(Runnable runnable) {
-      return executor.submit(new SemaphoreRunnable(runnable));
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
-        throws InterruptedException {
-      return executor.invokeAll(
-          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
-    }
-
-    @Override
-    public <T> List<Future<T>> invokeAll(
-        Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
-        throws InterruptedException {
-      return executor.invokeAll(
-          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
-          l,
-          timeUnit);
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> collection)
-        throws InterruptedException, ExecutionException {
-      return executor.invokeAny(
-          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()));
-    }
-
-    @Override
-    public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      return executor.invokeAny(
-          collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()),
-          l,
-          timeUnit);
-    }
-
-    @Override
-    public void execute(Runnable runnable) {
-      executor.execute(new SemaphoreRunnable(runnable));
-    }
-
-    private class SemaphoreRunnable implements Runnable {
-      private final Runnable runnable;
-
-      SemaphoreRunnable(Runnable runnable) {
-        this.runnable = runnable;
-      }
-
-      @Override
-      public void run() {
-        try {
-          semaphore.acquire();
-        } catch (InterruptedException e) {
-          throw new RuntimeException("semaphore acquisition interrupted. task canceled.");
-        }
-        try {
-          runnable.run();
-        } finally {
-          semaphore.release();
-        }
-      }
-    }
-
-    private class SemaphoreCallable<V> implements Callable<V> {
-      private final Callable<V> callable;
-
-      SemaphoreCallable(Callable<V> callable) {
-        this.callable = callable;
-      }
-
-      @Override
-      public V call() throws Exception {
-        semaphore.acquire();
-        try {
-          return callable.call();
-        } finally {
-          semaphore.release();
-        }
-      }
+      return Futures.submitAsync(
+          () -> {
+            semaphore.acquire();
+            ListenableFuture<T> listenableFuture = taskExecutor.submit(callable);
+            listenableFuture.addListener(semaphore::release, MoreExecutors.directExecutor());
+            return listenableFuture;
+          },
+          taskSubmitExecutor);
     }
   }
 }