You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:46 UTC
[40/50] [abbrv] incubator-beam git commit: [BEAM-142] - BigQueryIO:
don't unnecessarily initialize an ExecutorService to validate parameters
[BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters
By default, BigQueryIO initializes a ExecutorContext, however AppEngine
doesn't allow modification of threads to be daemon
threads. Using GcsOptions.ExecutorContext to create the
executorContext.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6924358e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6924358e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6924358e
Branch: refs/heads/runners-spark2
Commit: 6924358e701839f2b2deeb37ac4c106ae03ed731
Parents: 8853118
Author: Lucas Amorim <lu...@protonmail.com>
Authored: Tue Jun 28 14:28:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:52 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 10 +++----
.../beam/sdk/util/BigQueryTableInserter.java | 31 +++++++++++++-------
.../sdk/util/BigQueryTableInserterTest.java | 17 +++++++----
.../apache/beam/sdk/util/BigQueryUtilTest.java | 12 +++++---
.../util/RetryHttpRequestInitializerTest.java | 5 +++-
5 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index a9d85b8..790e3ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -1594,7 +1594,7 @@ public class BigQueryIO {
TableReference table) {
try {
Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
if (!inserter.isEmpty(table)) {
throw new IllegalArgumentException(
"BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
@@ -2084,7 +2084,7 @@ public class BigQueryIO {
for (String tableSpec : tableRows.keySet()) {
TableReference tableReference = getOrCreateTable(options, tableSpec);
flushRows(client, tableReference, tableRows.get(tableSpec),
- uniqueIdsForTableRows.get(tableSpec));
+ uniqueIdsForTableRows.get(tableSpec), options);
}
tableRows.clear();
uniqueIdsForTableRows.clear();
@@ -2109,7 +2109,7 @@ public class BigQueryIO {
if (!createdTables.contains(tableSpec)) {
TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
CreateDisposition.CREATE_IF_NEEDED, tableSchema);
createdTables.add(tableSpec);
@@ -2121,10 +2121,10 @@ public class BigQueryIO {
/** Writes the accumulated rows into BigQuery with streaming API. */
private void flushRows(Bigquery client, TableReference tableReference,
- List<TableRow> tableRows, List<String> uniqueIds) {
+ List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) {
if (!tableRows.isEmpty()) {
try {
- BigQueryTableInserter inserter = new BigQueryTableInserter(client);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
index f87a3c4..84004a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import com.google.api.client.util.BackOff;
@@ -38,7 +40,6 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +51,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -83,43 +82,51 @@ public class BigQueryTableInserter {
private final TableReference defaultRef;
private final long maxRowsPerBatch;
- private static final ExecutorService executor = MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS);
+ private ExecutorService executor;
/**
* Constructs a new row inserter.
*
* @param client a BigQuery client
+ * @param options a PipelineOptions object
*/
- public BigQueryTableInserter(Bigquery client) {
+ public BigQueryTableInserter(Bigquery client, PipelineOptions options) {
this.client = client;
this.defaultRef = null;
this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
}
/**
* Constructs a new row inserter.
*
* @param client a BigQuery client
+ * @param options a PipelineOptions object
* @param defaultRef identifies the table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)}
+ * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)}
*/
@Deprecated
- public BigQueryTableInserter(Bigquery client, TableReference defaultRef) {
+ public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+ TableReference defaultRef) {
this.client = client;
this.defaultRef = defaultRef;
this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
}
/**
* Constructs a new row inserter.
*
* @param client a BigQuery client
+ * @param options a PipelineOptions object
+ * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
*/
- public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) {
+ public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+ int maxRowsPerBatch) {
this.client = client;
this.defaultRef = null;
this.maxRowsPerBatch = maxRowsPerBatch;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
}
/**
@@ -127,13 +134,15 @@ public class BigQueryTableInserter {
*
* @param client a BigQuery client
* @param defaultRef identifies the default table to insert into
- * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)}
+ * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)}
*/
@Deprecated
- public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) {
+ public BigQueryTableInserter(Bigquery client, PipelineOptions options,
+ TableReference defaultRef, int maxRowsPerBatch) {
this.client = client;
this.defaultRef = defaultRef;
this.maxRowsPerBatch = maxRowsPerBatch;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
index 7d9c8a8..344e916 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import com.google.api.client.googleapis.json.GoogleJsonError;
@@ -75,6 +77,7 @@ public class BigQueryTableInserterTest {
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
@Mock private LowLevelHttpResponse response;
private Bigquery bigquery;
+ private PipelineOptions options;
@Before
public void setUp() {
@@ -97,6 +100,8 @@ public class BigQueryTableInserterTest {
new Bigquery.Builder(
transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
.build();
+
+ options = PipelineOptionsFactory.create();
}
@After
@@ -139,7 +144,7 @@ public class BigQueryTableInserterTest {
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testTable));
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
Table ret =
inserter.tryCreateTable(
new Table(),
@@ -160,7 +165,7 @@ public class BigQueryTableInserterTest {
public void testCreateTableSucceedsAlreadyExists() throws IOException {
when(response.getStatusCode()).thenReturn(409); // 409 means already exists
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
Table ret =
inserter.tryCreateTable(
new Table(),
@@ -191,7 +196,7 @@ public class BigQueryTableInserterTest {
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(testTable));
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
Table ret =
inserter.tryCreateTable(
testTable,
@@ -227,7 +232,7 @@ public class BigQueryTableInserterTest {
thrown.expect(GoogleJsonResponseException.class);
thrown.expectMessage("actually forbidden");
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
try {
inserter.tryCreateTable(
new Table(),
@@ -261,7 +266,7 @@ public class BigQueryTableInserterTest {
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(new TableDataInsertAllResponse()));
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
inserter.insertAll(ref, rows);
verify(response, times(2)).getStatusCode();
@@ -291,7 +296,7 @@ public class BigQueryTableInserterTest {
thrown.expect(GoogleJsonResponseException.class);
thrown.expectMessage("actually forbidden");
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
try {
inserter.insertAll(ref, rows);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
index 65fbeb7..c033a7d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Sum;
@@ -81,10 +83,12 @@ public class BigQueryUtilTest {
@Mock private Bigquery.Tables.Get mockTablesGet;
@Mock private Bigquery.Tabledata mockTabledata;
@Mock private Bigquery.Tabledata.List mockTabledataList;
+ private PipelineOptions options;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ this.options = PipelineOptionsFactory.create();
}
@After
@@ -369,7 +373,7 @@ public class BigQueryUtilTest {
TableReference ref = BigQueryIO
.parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
@@ -387,7 +391,7 @@ public class BigQueryUtilTest {
TableReference ref = BigQueryIO
.parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
@@ -408,7 +412,7 @@ public class BigQueryUtilTest {
TableReference ref = BigQueryIO
.parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
try {
inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
@@ -432,7 +436,7 @@ public class BigQueryUtilTest {
TableReference ref = BigQueryIO
.parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, 5);
+ BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
List<TableRow> rows = new ArrayList<>();
List<String> ids = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 7d212d4..83ffaa1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -30,6 +30,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
@@ -281,7 +283,8 @@ public class RetryHttpRequestInitializerTest {
// RetryHttpInitializer.
Bigquery b = new Bigquery.Builder(
transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(b);
+
+ BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create());
TableReference t = new TableReference()
.setProjectId("project").setDatasetId("dataset").setTableId("table");