You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:46 UTC

[40/50] [abbrv] incubator-beam git commit: [BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters

[BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters

By default, BigQueryIO initializes a ExecutorContext, however AppEngine
doesn't allow modification of threads to be daemon
threads. Using GcsOptions.ExecutorContext to create the
executorContext.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6924358e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6924358e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6924358e

Branch: refs/heads/runners-spark2
Commit: 6924358e701839f2b2deeb37ac4c106ae03ed731
Parents: 8853118
Author: Lucas Amorim <lu...@protonmail.com>
Authored: Tue Jun 28 14:28:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:52 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 10 +++----
 .../beam/sdk/util/BigQueryTableInserter.java    | 31 +++++++++++++-------
 .../sdk/util/BigQueryTableInserterTest.java     | 17 +++++++----
 .../apache/beam/sdk/util/BigQueryUtilTest.java  | 12 +++++---
 .../util/RetryHttpRequestInitializerTest.java   |  5 +++-
 5 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index a9d85b8..790e3ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -1594,7 +1594,7 @@ public class BigQueryIO {
           TableReference table) {
         try {
           Bigquery client = Transport.newBigQueryClient(options).build();
-          BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+          BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
           if (!inserter.isEmpty(table)) {
             throw new IllegalArgumentException(
                 "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
@@ -2084,7 +2084,7 @@ public class BigQueryIO {
       for (String tableSpec : tableRows.keySet()) {
         TableReference tableReference = getOrCreateTable(options, tableSpec);
         flushRows(client, tableReference, tableRows.get(tableSpec),
-            uniqueIdsForTableRows.get(tableSpec));
+            uniqueIdsForTableRows.get(tableSpec), options);
       }
       tableRows.clear();
       uniqueIdsForTableRows.clear();
@@ -2109,7 +2109,7 @@ public class BigQueryIO {
           if (!createdTables.contains(tableSpec)) {
             TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
             Bigquery client = Transport.newBigQueryClient(options).build();
-            BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+            BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
             inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
                 CreateDisposition.CREATE_IF_NEEDED, tableSchema);
             createdTables.add(tableSpec);
@@ -2121,10 +2121,10 @@ public class BigQueryIO {
 
     /** Writes the accumulated rows into BigQuery with streaming API. */
     private void flushRows(Bigquery client, TableReference tableReference,
-        List<TableRow> tableRows, List<String> uniqueIds) {
+        List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) {
       if (!tableRows.isEmpty()) {
         try {
-          BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+          BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
           inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
         } catch (IOException e) {
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
index f87a3c4..84004a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 
 import com.google.api.client.util.BackOff;
@@ -38,7 +40,6 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,9 +51,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
@@ -83,43 +82,51 @@ public class BigQueryTableInserter {
   private final TableReference defaultRef;
   private final long maxRowsPerBatch;
 
-  private static final ExecutorService executor = MoreExecutors.getExitingExecutorService(
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS);
+  private ExecutorService executor;
 
   /**
    * Constructs a new row inserter.
    *
    * @param client a BigQuery client
+   * @param options a PipelineOptions object
    */
-  public BigQueryTableInserter(Bigquery client) {
+  public BigQueryTableInserter(Bigquery client, PipelineOptions options) {
     this.client = client;
     this.defaultRef = null;
     this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
   }
 
   /**
    * Constructs a new row inserter.
    *
    * @param client a BigQuery client
+   * @param options a PipelineOptions object
    * @param defaultRef identifies the table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)}
+   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)}
    */
   @Deprecated
-  public BigQueryTableInserter(Bigquery client, TableReference defaultRef) {
+  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+                               TableReference defaultRef) {
     this.client = client;
     this.defaultRef = defaultRef;
     this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
   }
 
   /**
    * Constructs a new row inserter.
    *
    * @param client a BigQuery client
+   * @param options a PipelineOptions object
+   * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
    */
-  public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) {
+  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+                               int maxRowsPerBatch) {
     this.client = client;
     this.defaultRef = null;
     this.maxRowsPerBatch = maxRowsPerBatch;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
   }
 
   /**
@@ -127,13 +134,15 @@ public class BigQueryTableInserter {
    *
    * @param client a BigQuery client
    * @param defaultRef identifies the default table to insert into
-   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)}
+   * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)}
    */
   @Deprecated
-  public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) {
+  public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+                               TableReference defaultRef, int maxRowsPerBatch) {
     this.client = client;
     this.defaultRef = defaultRef;
     this.maxRowsPerBatch = maxRowsPerBatch;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
index 7d9c8a8..344e916 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 
 import com.google.api.client.googleapis.json.GoogleJsonError;
@@ -75,6 +77,7 @@ public class BigQueryTableInserterTest {
   @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
   @Mock private LowLevelHttpResponse response;
   private Bigquery bigquery;
+  private PipelineOptions options;
 
   @Before
   public void setUp() {
@@ -97,6 +100,8 @@ public class BigQueryTableInserterTest {
         new Bigquery.Builder(
                 transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
             .build();
+
+    options = PipelineOptionsFactory.create();
   }
 
   @After
@@ -139,7 +144,7 @@ public class BigQueryTableInserterTest {
     when(response.getStatusCode()).thenReturn(200);
     when(response.getContent()).thenReturn(toStream(testTable));
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
     Table ret =
         inserter.tryCreateTable(
             new Table(),
@@ -160,7 +165,7 @@ public class BigQueryTableInserterTest {
   public void testCreateTableSucceedsAlreadyExists() throws IOException {
     when(response.getStatusCode()).thenReturn(409); // 409 means already exists
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
     Table ret =
         inserter.tryCreateTable(
             new Table(),
@@ -191,7 +196,7 @@ public class BigQueryTableInserterTest {
         .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
         .thenReturn(toStream(testTable));
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
     Table ret =
         inserter.tryCreateTable(
             testTable,
@@ -227,7 +232,7 @@ public class BigQueryTableInserterTest {
     thrown.expect(GoogleJsonResponseException.class);
     thrown.expectMessage("actually forbidden");
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
     try {
       inserter.tryCreateTable(
           new Table(),
@@ -261,7 +266,7 @@ public class BigQueryTableInserterTest {
         .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
         .thenReturn(toStream(new TableDataInsertAllResponse()));
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
 
     inserter.insertAll(ref, rows);
     verify(response, times(2)).getStatusCode();
@@ -291,7 +296,7 @@ public class BigQueryTableInserterTest {
     thrown.expect(GoogleJsonResponseException.class);
     thrown.expectMessage("actually forbidden");
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
 
     try {
       inserter.insertAll(ref, rows);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
index 65fbeb7..c033a7d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Sum;
@@ -81,10 +83,12 @@ public class BigQueryUtilTest {
   @Mock private Bigquery.Tables.Get mockTablesGet;
   @Mock private Bigquery.Tabledata mockTabledata;
   @Mock private Bigquery.Tabledata.List mockTabledataList;
+  private PipelineOptions options;
 
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
+    this.options = PipelineOptionsFactory.create();
   }
 
   @After
@@ -369,7 +373,7 @@ public class BigQueryUtilTest {
     TableReference ref = BigQueryIO
         .parseTableSpec("project:dataset.table");
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
 
     inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
         BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
@@ -387,7 +391,7 @@ public class BigQueryUtilTest {
     TableReference ref = BigQueryIO
         .parseTableSpec("project:dataset.table");
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
 
     inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
         BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
@@ -408,7 +412,7 @@ public class BigQueryUtilTest {
     TableReference ref = BigQueryIO
         .parseTableSpec("project:dataset.table");
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
 
     try {
       inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
@@ -432,7 +436,7 @@ public class BigQueryUtilTest {
 
     TableReference ref = BigQueryIO
         .parseTableSpec("project:dataset.table");
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, 5);
+    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
 
     List<TableRow> rows = new ArrayList<>();
     List<String> ids = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 7d212d4..83ffaa1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.http.HttpRequest;
 import com.google.api.client.http.HttpResponse;
@@ -281,7 +283,8 @@ public class RetryHttpRequestInitializerTest {
     // RetryHttpInitializer.
     Bigquery b = new Bigquery.Builder(
         transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
-    BigQueryTableInserter inserter = new BigQueryTableInserter(b);
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create());
     TableReference t = new TableReference()
         .setProjectId("project").setDatasetId("dataset").setTableId("table");