You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/22 23:52:55 UTC
[1/2] incubator-beam git commit: [BEAM-480] Move insertAll() from
BigQueryTableInserter to BigQueryServices
Repository: incubator-beam
Updated Branches:
refs/heads/master 12b60ffa5 -> 122cd0466
[BEAM-480] Move insertAll() from BigQueryTableInserter to BigQueryServices
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf3af5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf3af5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf3af5de
Branch: refs/heads/master
Commit: bf3af5de9e6f8126ffd2ccd3f7a68a84e55e90ff
Parents: 12b60ff
Author: Pei He <pe...@google.com>
Authored: Fri Jul 22 15:03:38 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 16:52:49 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 ++--
.../sdk/io/gcp/bigquery/BigQueryServices.java | 13 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 192 +++++++++++++++++++
.../io/gcp/bigquery/BigQueryTableInserter.java | 192 -------------------
.../gcp/bigquery/BigQueryServicesImplTest.java | 70 +++++++
.../gcp/bigquery/BigQueryTableInserterTest.java | 64 -------
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 40 +---
7 files changed, 300 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 130d444..76f7079 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1668,11 +1668,13 @@ public class BigQueryIO {
@Override
public PDone apply(PCollection<TableRow> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ BigQueryServices bqServices = getBigQueryServices();
// In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
// and BigQuery's streaming import API.
if (options.isStreaming() || tableRefFunction != null) {
- return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
+ return input.apply(
+ new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices));
}
TableReference table = fromJsonString(jsonTableRef, TableReference.class);
@@ -1693,7 +1695,6 @@ public class BigQueryIO {
e);
}
- BigQueryServices bqServices = getBigQueryServices();
return input.apply("Write", org.apache.beam.sdk.io.Write.to(
new BigQuerySink(
jobIdToken,
@@ -2018,6 +2019,8 @@ public class BigQueryIO {
/** TableSchema in JSON. Use String to make the class Serializable. */
private final String jsonTableSchema;
+ private final BigQueryServices bqServices;
+
/** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
private transient Map<String, List<TableRow>> tableRows;
@@ -2034,8 +2037,9 @@ public class BigQueryIO {
createAggregator("ByteCount", new Sum.SumLongFn());
/** Constructor. */
- StreamingWriteFn(TableSchema schema) {
- jsonTableSchema = toJsonString(schema);
+ StreamingWriteFn(TableSchema schema, BigQueryServices bqServices) {
+ this.jsonTableSchema = toJsonString(schema);
+ this.bqServices = checkNotNull(bqServices, "bqServices");
}
/** Prepares a target BigQuery table. */
@@ -2060,11 +2064,10 @@ public class BigQueryIO {
@Override
public void finishBundle(Context context) throws Exception {
BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
- Bigquery client = Transport.newBigQueryClient(options).build();
for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
TableReference tableReference = getOrCreateTable(options, entry.getKey());
- flushRows(client, tableReference, entry.getValue(),
+ flushRows(tableReference, entry.getValue(),
uniqueIdsForTableRows.get(entry.getKey()), options);
}
tableRows.clear();
@@ -2100,13 +2103,17 @@ public class BigQueryIO {
return tableReference;
}
- /** Writes the accumulated rows into BigQuery with streaming API. */
- private void flushRows(Bigquery client, TableReference tableReference,
- List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) {
+ /**
+ * Writes the accumulated rows into BigQuery with streaming API.
+ */
+ private void flushRows(TableReference tableReference,
+ List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
+ throws InterruptedException {
if (!tableRows.isEmpty()) {
try {
- BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
- inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
+ long totalBytes = bqServices.getDatasetService(options).insertAll(
+ tableReference, tableRows, uniqueIds);
+ byteCountAggregator.addValue(totalBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -2320,14 +2327,17 @@ public class BigQueryIO {
private final transient TableReference tableReference;
private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
private final transient TableSchema tableSchema;
+ private final BigQueryServices bqServices;
/** Constructor. */
StreamWithDeDup(TableReference tableReference,
SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- TableSchema tableSchema) {
+ TableSchema tableSchema,
+ BigQueryServices bqServices) {
this.tableReference = tableReference;
this.tableRefFunction = tableRefFunction;
this.tableSchema = tableSchema;
+ this.bqServices = checkNotNull(bqServices, "bqServices");
}
@Override
@@ -2358,7 +2368,7 @@ public class BigQueryIO {
tagged
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
.apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(ParDo.of(new StreamingWriteFn(tableSchema)));
+ .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices)));
// Note that the implementation to return PDone here breaks the
// implicit assumption about the job execution order. If a user
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 3e77362..87887ec 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -32,13 +32,12 @@ import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
-/**
- * An interface for real, mock, or fake implementations of Cloud BigQuery services.
- */
+/** An interface for real, mock, or fake implementations of Cloud BigQuery services. */
interface BigQueryServices extends Serializable {
/**
@@ -140,6 +139,14 @@ interface BigQueryServices extends Serializable {
*/
void deleteDataset(String projectId, String datasetId)
throws IOException, InterruptedException;
+
+ /**
+ * Inserts {@link TableRow TableRows} with the specified insertIds if not null.
+ *
+ * Returns the total bytes count of {@link TableRow TableRows}.
+ */
+ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+ throws IOException, InterruptedException;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 414baae..ef17e0f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -17,8 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
import org.apache.beam.sdk.util.Transport;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -38,6 +43,8 @@ import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -48,7 +55,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -254,12 +268,53 @@ class BigQueryServicesImpl implements BigQueryServices {
@VisibleForTesting
static class DatasetServiceImpl implements DatasetService {
+ // Approximate amount of table data to upload per InsertAll request.
+ private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
+
+ // The maximum number of rows to upload per InsertAll request.
+ private static final long MAX_ROWS_PER_BATCH = 500;
+
+ // The maximum number of times to retry inserting rows into BigQuery.
+ private static final int MAX_INSERT_ATTEMPTS = 5;
+
+ // The initial backoff after a failure inserting rows into BigQuery.
+ private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+
+ // Backoff time bounds for rate limit exceeded errors.
+ private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
+ private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
+ private final PipelineOptions options;
+ private final long maxRowsPerBatch;
+
+ private ExecutorService executor;
+
+ @VisibleForTesting
+ DatasetServiceImpl(Bigquery client, PipelineOptions options) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = client;
+ this.options = options;
+ this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+ this.executor = null;
+ }
+
+ @VisibleForTesting
+ DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = client;
+ this.options = options;
+ this.maxRowsPerBatch = maxRowsPerBatch;
+ this.executor = null;
+ }
private DatasetServiceImpl(BigQueryOptions bqOptions) {
this.errorExtractor = new ApiErrorExtractor();
this.client = Transport.newBigQueryClient(bqOptions).build();
+ this.options = bqOptions;
+ this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+ this.executor = null;
}
/**
@@ -418,6 +473,143 @@ class BigQueryServicesImpl implements BigQueryServices {
Sleeper.DEFAULT,
backoff);
}
+
+ @Override
+ public long insertAll(
+ TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+ throws IOException, InterruptedException {
+ checkNotNull(ref, "ref");
+ if (executor == null) {
+ this.executor = options.as(GcsOptions.class).getExecutorService();
+ }
+ if (insertIdList != null && rowList.size() != insertIdList.size()) {
+ throw new AssertionError("If insertIdList is not null it needs to have at least "
+ + "as many elements as rowList");
+ }
+
+ AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
+ MAX_INSERT_ATTEMPTS,
+ INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+
+ long retTotalDataSize = 0;
+ List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
+ // These lists contain the rows to publish. Initially the contain the entire list.
+ // If there are failures, they will contain only the failed rows to be retried.
+ List<TableRow> rowsToPublish = rowList;
+ List<String> idsToPublish = insertIdList;
+ while (true) {
+ List<TableRow> retryRows = new ArrayList<>();
+ List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
+
+ int strideIndex = 0;
+ // Upload in batches.
+ List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
+ int dataSize = 0;
+
+ List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
+ List<Integer> strideIndices = new ArrayList<>();
+
+ for (int i = 0; i < rowsToPublish.size(); ++i) {
+ TableRow row = rowsToPublish.get(i);
+ TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+ if (idsToPublish != null) {
+ out.setInsertId(idsToPublish.get(i));
+ }
+ out.setJson(row.getUnknownKeys());
+ rows.add(out);
+
+ dataSize += row.toString().length();
+ if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
+ || i == rowsToPublish.size() - 1) {
+ TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+ content.setRows(rows);
+
+ final Bigquery.Tabledata.InsertAll insert = client.tabledata()
+ .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
+ content);
+
+ futures.add(
+ executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
+ @Override
+ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
+ BackOff backoff = new IntervalBoundedExponentialBackOff(
+ MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+ while (true) {
+ try {
+ return insert.execute().getInsertErrors();
+ } catch (IOException e) {
+ if (new ApiErrorExtractor().rateLimited(e)) {
+ LOG.info("BigQuery insertAll exceeded rate limit, retrying");
+ try {
+ Thread.sleep(backoff.nextBackOffMillis());
+ } catch (InterruptedException interrupted) {
+ throw new IOException(
+ "Interrupted while waiting before retrying insertAll");
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }));
+ strideIndices.add(strideIndex);
+
+ retTotalDataSize += dataSize;
+
+ dataSize = 0;
+ strideIndex = i + 1;
+ rows = new LinkedList<>();
+ }
+ }
+
+ try {
+ for (int i = 0; i < futures.size(); i++) {
+ List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
+ if (errors != null) {
+ for (TableDataInsertAllResponse.InsertErrors error : errors) {
+ allErrors.add(error);
+ if (error.getIndex() == null) {
+ throw new IOException("Insert failed: " + allErrors);
+ }
+
+ int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
+ retryRows.add(rowsToPublish.get(errorIndex));
+ if (retryIds != null) {
+ retryIds.add(idsToPublish.get(errorIndex));
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while inserting " + rowsToPublish);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+
+ if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
+ try {
+ Thread.sleep(backoff.nextBackOffMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted while waiting before retrying insert of " + retryRows);
+ }
+ LOG.info("Retrying failed inserts to BigQuery");
+ rowsToPublish = retryRows;
+ idsToPublish = retryIds;
+ allErrors.clear();
+ } else {
+ break;
+ }
+ }
+ if (!allErrors.isEmpty()) {
+ throw new IOException("Insert failed: " + allErrors);
+ } else {
+ return retTotalDataSize;
+ }
+ }
}
private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
index 00a4fa3..bf038f5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
@@ -17,15 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
@@ -33,11 +27,8 @@ import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
@@ -46,13 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -63,26 +47,7 @@ import javax.annotation.Nullable;
class BigQueryTableInserter {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
- // Approximate amount of table data to upload per InsertAll request.
- private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
- // The maximum number of rows to upload per InsertAll request.
- private static final long MAX_ROWS_PER_BATCH = 500;
-
- // The maximum number of times to retry inserting rows into BigQuery.
- private static final int MAX_INSERT_ATTEMPTS = 5;
-
- // The initial backoff after a failure inserting rows into BigQuery.
- private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
- // Backoff time bounds for rate limit exceeded errors.
- private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
- private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
-
private final Bigquery client;
- private final long maxRowsPerBatch;
-
- private ExecutorService executor;
/**
* Constructs a new row inserter.
@@ -92,163 +57,6 @@ class BigQueryTableInserter {
*/
BigQueryTableInserter(Bigquery client, PipelineOptions options) {
this.client = client;
- this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Constructs a new row inserter.
- *
- * @param client a BigQuery client
- * @param options a PipelineOptions object
- * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
- */
- BigQueryTableInserter(Bigquery client, PipelineOptions options,
- int maxRowsPerBatch) {
- this.client = client;
- this.maxRowsPerBatch = maxRowsPerBatch;
- this.executor = options.as(GcsOptions.class).getExecutorService();
- }
-
- /**
- * Insert all rows from the given list.
- */
- void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
- insertAll(ref, rowList, null, null);
- }
-
- /**
- * Insert all rows from the given list using specified insertIds if not null. Track count of
- * bytes written with the Aggregator.
- */
- void insertAll(TableReference ref, List<TableRow> rowList,
- @Nullable List<String> insertIdList, @Nullable Aggregator<Long, Long> byteCountAggregator)
- throws IOException {
- checkNotNull(ref, "ref");
- if (insertIdList != null && rowList.size() != insertIdList.size()) {
- throw new AssertionError("If insertIdList is not null it needs to have at least "
- + "as many elements as rowList");
- }
-
- AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
- MAX_INSERT_ATTEMPTS,
- INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
- List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
- // These lists contain the rows to publish. Initially the contain the entire list. If there are
- // failures, they will contain only the failed rows to be retried.
- List<TableRow> rowsToPublish = rowList;
- List<String> idsToPublish = insertIdList;
- while (true) {
- List<TableRow> retryRows = new ArrayList<>();
- List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
-
- int strideIndex = 0;
- // Upload in batches.
- List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
- int dataSize = 0;
-
- List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
- List<Integer> strideIndices = new ArrayList<>();
-
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i);
- TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
- dataSize += row.toString().length();
- if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
- || i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new TableDataInsertAllRequest();
- content.setRows(rows);
-
- final Bigquery.Tabledata.InsertAll insert = client.tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
- content);
-
- futures.add(
- executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
- @Override
- public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
- BackOff backoff = new IntervalBoundedExponentialBackOff(
- MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
- while (true) {
- try {
- return insert.execute().getInsertErrors();
- } catch (IOException e) {
- if (new ApiErrorExtractor().rateLimited(e)) {
- LOG.info("BigQuery insertAll exceeded rate limit, retrying");
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying insertAll");
- }
- } else {
- throw e;
- }
- }
- }
- }
- }));
- strideIndices.add(strideIndex);
-
- if (byteCountAggregator != null) {
- byteCountAggregator.addValue((long) dataSize);
- }
- dataSize = 0;
- strideIndex = i + 1;
- rows = new LinkedList<>();
- }
- }
-
- try {
- for (int i = 0; i < futures.size(); i++) {
- List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
- if (errors != null) {
- for (TableDataInsertAllResponse.InsertErrors error : errors) {
- allErrors.add(error);
- if (error.getIndex() == null) {
- throw new IOException("Insert failed: " + allErrors);
- }
-
- int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
- retryRows.add(rowsToPublish.get(errorIndex));
- if (retryIds != null) {
- retryIds.add(idsToPublish.get(errorIndex));
- }
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while inserting " + rowsToPublish);
- } catch (ExecutionException e) {
- throw new RuntimeException(e.getCause());
- }
-
- if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
- try {
- Thread.sleep(backoff.nextBackOffMillis());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
- }
- LOG.info("Retrying failed inserts to BigQuery");
- rowsToPublish = retryRows;
- idsToPublish = retryIds;
- allErrors.clear();
- } else {
- break;
- }
- }
- if (!allErrors.isEmpty()) {
- throw new IOException("Insert failed: " + allErrors);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 2cdf511..686685b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -18,11 +18,14 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
@@ -32,6 +35,7 @@ import org.apache.beam.sdk.util.Transport;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.Json;
@@ -46,6 +50,9 @@ import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.collect.ImmutableList;
@@ -61,6 +68,8 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
/**
* Tests for {@link BigQueryServicesImpl}.
@@ -267,6 +276,67 @@ public class BigQueryServicesImplTest {
verify(response, times(1)).getContentType();
}
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} retries quota rate limited attempts.
+ */
+ @Test
+ public void testInsertRetry() throws Exception {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<TableRow> rows = new ArrayList<>();
+ rows.add(new TableRow());
+
+ // First response is 403 rate limited, second response has valid payload.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+ .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ dataService.insertAll(ref, rows, null);
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
+ }
+
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts.
+ */
+ @Test
+ public void testInsertDoesNotRetry() throws Throwable {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<TableRow> rows = new ArrayList<>();
+ rows.add(new TableRow());
+
+ // First response is 403 not-rate-limited, second response has valid payload but should not
+ // be invoked.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+ .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+ thrown.expect(GoogleJsonResponseException.class);
+ thrown.expectMessage("actually forbidden");
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ try {
+ dataService.insertAll(ref, rows, null);
+ fail();
+ } catch (RuntimeException e) {
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ throw e.getCause();
+ }
+ }
+
/** A helper to wrap a {@link GenericJson} object in a content stream. */
private static InputStream toStream(GenericJson content) throws IOException {
return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
index c29da91..dac3911 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
@@ -48,9 +48,7 @@ import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import com.google.common.collect.ImmutableList;
@@ -67,8 +65,6 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
/**
* Tests of {@link BigQueryTableInserter}.
@@ -250,64 +246,4 @@ public class BigQueryTableInserterTest {
throw e;
}
}
-
- /**
- * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
- */
- @Test
- public void testInsertRetry() throws IOException {
- TableReference ref =
- new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<TableRow> rows = new ArrayList<>();
- rows.add(new TableRow());
-
- // First response is 403 rate limited, second response has valid payload.
- when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
- when(response.getStatusCode()).thenReturn(403).thenReturn(200);
- when(response.getContent())
- .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
- .thenReturn(toStream(new TableDataInsertAllResponse()));
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
- inserter.insertAll(ref, rows);
- verify(response, times(2)).getStatusCode();
- verify(response, times(2)).getContent();
- verify(response, times(2)).getContentType();
- expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
- }
-
- /**
- * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
- */
- @Test
- public void testInsertDoesNotRetry() throws Throwable {
- TableReference ref =
- new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- List<TableRow> rows = new ArrayList<>();
- rows.add(new TableRow());
-
- // First response is 403 not-rate-limited, second response has valid payload but should not
- // be invoked.
- when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
- when(response.getStatusCode()).thenReturn(403).thenReturn(200);
- when(response.getContent())
- .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
- .thenReturn(toStream(new TableDataInsertAllResponse()));
-
- thrown.expect(GoogleJsonResponseException.class);
- thrown.expectMessage("actually forbidden");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
- try {
- inserter.insertAll(ref, rows);
- fail();
- } catch (RuntimeException e) {
- verify(response, times(1)).getStatusCode();
- verify(response, times(1)).getContent();
- verify(response, times(1)).getContentType();
- throw e.getCause();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index e0c353b..89284df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -30,11 +30,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Table;
@@ -435,7 +433,7 @@ public class BigQueryUtilTest {
TableReference ref = BigQueryIO
.parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
+ DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5);
List<TableRow> rows = new ArrayList<>();
List<String> ids = new ArrayList<>();
@@ -444,41 +442,13 @@ public class BigQueryUtilTest {
ids.add(new String());
}
- InMemoryLongSumAggregator byteCountAggregator = new InMemoryLongSumAggregator("ByteCount");
+ long totalBytes = 0;
try {
- inserter.insertAll(ref, rows, ids, byteCountAggregator);
+ totalBytes = datasetService.insertAll(ref, rows, ids);
} finally {
verifyInsertAll(5);
// Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
- assertEquals("Incorrect byte count", 25L * 23L, byteCountAggregator.getSum());
- }
- }
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
- }
-
- public long getSum() {
- return sum;
+ assertEquals("Incorrect byte count", 25L * 23L, totalBytes);
}
}
}
[2/2] incubator-beam git commit: Closes #717
Posted by dh...@apache.org.
Closes #717
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/122cd046
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/122cd046
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/122cd046
Branch: refs/heads/master
Commit: 122cd04663529f4fc44530ed9e144e0dfd68360f
Parents: 12b60ff bf3af5d
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 22 16:52:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 22 16:52:50 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 ++--
.../sdk/io/gcp/bigquery/BigQueryServices.java | 13 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 192 +++++++++++++++++++
.../io/gcp/bigquery/BigQueryTableInserter.java | 192 -------------------
.../gcp/bigquery/BigQueryServicesImplTest.java | 70 +++++++
.../gcp/bigquery/BigQueryTableInserterTest.java | 64 -------
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 40 +---
7 files changed, 300 insertions(+), 307 deletions(-)
----------------------------------------------------------------------