You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2019/03/01 02:04:00 UTC

[jira] [Work logged] (BEAM-6443) decrease the number of threads for BigQuery streaming insertAll

     [ https://issues.apache.org/jira/browse/BEAM-6443?focusedWorklogId=206181&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-206181 ]

ASF GitHub Bot logged work on BEAM-6443:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Mar/19 02:03
            Start Date: 01/Mar/19 02:03
    Worklog Time Spent: 10m 
      Work Description: ihji commented on pull request #7547: [BEAM-6443] decrease the number of thread for BigQuery streaming inseā€¦
URL: https://github.com/apache/beam/pull/7547#discussion_r261457821
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 ##########
 @@ -1001,4 +1007,141 @@ public void close() {
       client.close();
     }
   }
+
+  private static class BoundedExecutorService implements ExecutorService {
+    private final ExecutorService executor;
+    private final Semaphore semaphore;
+    private final int parallelism;
+
+    BoundedExecutorService(ExecutorService executor, int parallelism) {
+      this.executor = executor;
+      this.parallelism = parallelism;
+      this.semaphore = new Semaphore(parallelism);
+    }
+
+    @Override
+    public void shutdown() {
+      executor.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      List<Runnable> runnables = executor.shutdownNow();
+      // try to release permits as many as possible before returning semaphored runnables.
+      synchronized (this) {
+        if (semaphore.availablePermits() <= parallelism) {
+          semaphore.release(Integer.MAX_VALUE - parallelism);
 
 Review comment:
   I wanted to remove the restriction from the semaphore by setting the number of allowed permits to its maximum value (which is Integer.MAX_VALUE). Looks like it's impossible to retrieve the original Callable here so returning maximally allowed semaphored callable instead.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 206181)
    Time Spent: 3h 50m  (was: 3h 40m)

> decrease the number of threads for BigQuery streaming insertAll
> ---------------------------------------------------------------
>
>                 Key: BEAM-6443
>                 URL: https://issues.apache.org/jira/browse/BEAM-6443
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: Major
>              Labels: triaged
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> When inserting (a large number of ) very small elements into BigQuery via streaming insertAll, BigQueryIO causes lots of quota exceeded errors. This implies that 1) BigQueryIO puts unnecessary overheads on BigQuery API layer by sending requests too fast 2) log file becomes very big because of repeated same error messages. Currently we use 50 shards for writing data into BigQuery and in each bundle 20-30 futures are executed simultaneously with unlimited thread pool. It would be worth investigating whether just single thread pool is sufficient for running concurrent insertAll.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)