You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/10/25 03:27:11 UTC
[beam] branch master updated: Fixing BigQueryIO request too big
corner case for streaming inserts (#15067)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1f08d1f Fixing BigQueryIO request too big corner case for streaming inserts (#15067)
1f08d1f is described below
commit 1f08d1f3ddc2e7bc7341be4b29bdafaec18de9cc
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Sun Oct 24 20:25:09 2021 -0700
Fixing BigQueryIO request too big corner case for streaming inserts (#15067)
* Rewriting for readability
* Address comments
* Address comments
* Addressing comments. Simplifying logic
---
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 248 +++++++++++++--------
.../io/gcp/bigquery/BigQueryServicesImplTest.java | 109 ++++++++-
.../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java | 45 ++--
3 files changed, 280 insertions(+), 122 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index a636b04..9f03dc4d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -53,6 +53,7 @@ import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auth.Credentials;
@@ -811,6 +812,103 @@ class BigQueryServicesImpl implements BigQueryServices {
ALWAYS_RETRY);
}
+ static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
+ private final TableReference ref;
+ private final Boolean skipInvalidRows;
+ private final Boolean ignoreUnkownValues;
+ private final Bigquery client;
+ private final FluentBackoff rateLimitBackoffFactory;
+ private final List<TableDataInsertAllRequest.Rows> rows;
+ private final AtomicLong maxThrottlingMsec;
+ private final Sleeper sleeper;
+
+ InsertBatchofRowsCallable(
+ TableReference ref,
+ Boolean skipInvalidRows,
+ Boolean ignoreUnknownValues,
+ Bigquery client,
+ FluentBackoff rateLimitBackoffFactory,
+ List<TableDataInsertAllRequest.Rows> rows,
+ AtomicLong maxThrottlingMsec,
+ Sleeper sleeper) {
+ this.ref = ref;
+ this.skipInvalidRows = skipInvalidRows;
+ this.ignoreUnkownValues = ignoreUnknownValues;
+ this.client = client;
+ this.rateLimitBackoffFactory = rateLimitBackoffFactory;
+ this.rows = rows;
+ this.maxThrottlingMsec = maxThrottlingMsec;
+ this.sleeper = sleeper;
+ }
+
+ @Override
+ public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
+ TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+ content.setRows(rows);
+ content.setSkipInvalidRows(skipInvalidRows);
+ content.setIgnoreUnknownValues(ignoreUnkownValues);
+
+ final Bigquery.Tabledata.InsertAll insert =
+ client
+ .tabledata()
+ .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
+ .setPrettyPrint(false);
+
+ // A backoff for rate limit exceeded errors.
+ BackOff backoff1 = BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
+ long totalBackoffMillis = 0L;
+ while (true) {
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
+ try {
+ List<TableDataInsertAllResponse.InsertErrors> response =
+ insert.execute().getInsertErrors();
+ if (response == null || response.isEmpty()) {
+ serviceCallMetric.call("ok");
+ } else {
+ for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
+ for (ErrorProto insertError : insertErrors.getErrors()) {
+ serviceCallMetric.call(insertError.getReason());
+ }
+ }
+ }
+ return response;
+ } catch (IOException e) {
+ GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+ if (errorInfo == null) {
+ serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+ throw e;
+ }
+ serviceCallMetric.call(errorInfo.getReason());
+ /**
+ * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+ * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+ * GoogleCloudDataproc/hadoop-connectors
+ */
+ if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+ && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+ throw e;
+ }
+ LOG.info(
+ String.format(
+ "BigQuery insertAll error, retrying: %s",
+ ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+ try {
+ long nextBackOffMillis = backoff1.nextBackOffMillis();
+ if (nextBackOffMillis == BackOff.STOP) {
+ throw e;
+ }
+ sleeper.sleep(nextBackOffMillis);
+ totalBackoffMillis += nextBackOffMillis;
+ final long totalBackoffMillisSoFar = totalBackoffMillis;
+ maxThrottlingMsec.getAndUpdate(current -> Math.max(current, totalBackoffMillisSoFar));
+ } catch (InterruptedException interrupted) {
+ throw new IOException("Interrupted while waiting before retrying insertAll");
+ }
+ }
+ }
+ }
+ }
+
@VisibleForTesting
<T> long insertAll(
TableReference ref,
@@ -865,104 +963,78 @@ class BigQueryServicesImpl implements BigQueryServices {
// Store the longest throttled time across all parallel threads
final AtomicLong maxThrottlingMsec = new AtomicLong();
- for (int i = 0; i < rowsToPublish.size(); ++i) {
- TableRow row = rowsToPublish.get(i).getValue();
- TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
- if (idsToPublish != null) {
- out.setInsertId(idsToPublish.get(i));
- }
- out.setJson(row.getUnknownKeys());
- rows.add(out);
-
+ int rowIndex = 0;
+ while (rowIndex < rowsToPublish.size()) {
+ TableRow row = rowsToPublish.get(rowIndex).getValue();
+ long nextRowSize = 0L;
try {
- dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+ nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
} catch (Exception ex) {
throw new RuntimeException("Failed to convert the row to JSON", ex);
}
- if (dataSize >= maxRowBatchSize
- || rows.size() >= maxRowsPerBatch
- || i == rowsToPublish.size() - 1) {
- TableDataInsertAllRequest content = new TableDataInsertAllRequest();
- content.setRows(rows);
- content.setSkipInvalidRows(skipInvalidRows);
- content.setIgnoreUnknownValues(ignoreUnkownValues);
-
- final Bigquery.Tabledata.InsertAll insert =
- client
- .tabledata()
- .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
- .setPrettyPrint(false);
-
- // Create final reference (which cannot change).
- // So the lamba expression can refer to rowsInsertedForRequest to use on error.
+ // The following scenario must be *extremely* rare.
+ // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+ // impossible to insert into BigQuery, and so we send it out through the dead-letter
+ // queue.
+ if (nextRowSize >= maxRowBatchSize) {
+ errorContainer.add(
+ failedInserts,
+ new InsertErrors()
+ .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+ ref,
+ rowsToPublish.get(rowIndex));
+ rowIndex++;
+ continue;
+ }
+
+ if (nextRowSize + dataSize >= maxRowBatchSize || rows.size() + 1 > maxRowsPerBatch) {
+ // If the row does not fit into the insert buffer, then we take the current buffer,
+ // issue the insert call, and we retry adding the same row to the troublesome buffer.
+ // Add a future to insert the current batch into BQ.
futures.add(
executor.submit(
- () -> {
- // A backoff for rate limit exceeded errors.
- BackOff backoff1 =
- BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
- long totalBackoffMillis = 0L;
- while (true) {
- ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
- try {
- List<TableDataInsertAllResponse.InsertErrors> response =
- insert.execute().getInsertErrors();
- if (response == null || response.isEmpty()) {
- serviceCallMetric.call("ok");
- } else {
- for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
- for (ErrorProto insertError : insertErrors.getErrors()) {
- serviceCallMetric.call(insertError.getReason());
- }
- }
- }
- return response;
- } catch (IOException e) {
- GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
- if (errorInfo == null) {
- serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
- throw e;
- }
- serviceCallMetric.call(errorInfo.getReason());
- /**
- * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
- * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
- * GoogleCloudDataproc/hadoop-connectors
- */
- if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
- && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
- throw e;
- }
- LOG.info(
- String.format(
- "BigQuery insertAll error, retrying: %s",
- ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
- try {
- long nextBackOffMillis = backoff1.nextBackOffMillis();
- if (nextBackOffMillis == BackOff.STOP) {
- throw e;
- }
- sleeper.sleep(nextBackOffMillis);
- totalBackoffMillis += nextBackOffMillis;
- final long totalBackoffMillisSoFar = totalBackoffMillis;
- maxThrottlingMsec.getAndUpdate(
- current -> Math.max(current, totalBackoffMillisSoFar));
- } catch (InterruptedException interrupted) {
- throw new IOException(
- "Interrupted while waiting before retrying insertAll");
- }
- }
- }
- }));
+ new InsertBatchofRowsCallable(
+ ref,
+ skipInvalidRows,
+ ignoreUnkownValues,
+ client,
+ rateLimitBackoffFactory,
+ rows,
+ maxThrottlingMsec,
+ sleeper)));
strideIndices.add(strideIndex);
-
retTotalDataSize += dataSize;
-
- dataSize = 0L;
- strideIndex = i + 1;
+ strideIndex = rowIndex;
rows = new ArrayList<>();
+ dataSize = 0L;
+ }
+ // If the row fits into the insert buffer, then we add it to the buffer to be inserted
+ // later, and we move onto the next row.
+ TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+ if (idsToPublish != null) {
+ out.setInsertId(idsToPublish.get(rowIndex));
}
+ out.setJson(row.getUnknownKeys());
+ rows.add(out);
+ rowIndex++;
+ dataSize += nextRowSize;
+ }
+
+ if (rows.size() > 0) {
+ futures.add(
+ executor.submit(
+ new InsertBatchofRowsCallable(
+ ref,
+ skipInvalidRows,
+ ignoreUnkownValues,
+ client,
+ rateLimitBackoffFactory,
+ rows,
+ maxThrottlingMsec,
+ sleeper)));
+ strideIndices.add(strideIndex);
+ retTotalDataSize += dataSize;
}
try {
@@ -1067,7 +1139,7 @@ class BigQueryServicesImpl implements BigQueryServices {
successfulRows);
}
- protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
+ protected static GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
if (!(e instanceof GoogleJsonResponseException)) {
return null;
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index efcb5bd..f39aea5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -851,6 +851,111 @@ public class BigQueryServicesImplTest {
verifyWriteMetricWasSet("project", "dataset", "table", "ok", 1);
}
+ /** Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of rows per request. */
+ @Test
+ public void testInsertWithinRowCountLimits() throws Exception {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("row", "a")),
+ wrapValue(new TableRow().set("row", "b")),
+ wrapValue(new TableRow().set("row", "c")));
+ List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+ final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+ setupMockResponses(
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ },
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ },
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ });
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(
+ bigquery,
+ null,
+ PipelineOptionsFactory.fromArgs("--maxStreamingRowsToBatch=1").create());
+ dataService.insertAll(
+ ref,
+ rows,
+ insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ null,
+ null,
+ false,
+ false,
+ false,
+ null);
+
+ verifyAllResponsesAreRead();
+
+ verifyWriteMetricWasSet("project", "dataset", "table", "ok", 3);
+ }
+
+ /** Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of rows per request. */
+ @Test
+ public void testInsertWithinRequestByteSizeLimits() throws Exception {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+ ImmutableList.of(
+ wrapValue(new TableRow().set("row", "a")),
+ wrapValue(new TableRow().set("row", "b")),
+ wrapValue(new TableRow().set("row", "cdefghijklmnopqrstuvwxyz")));
+ List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+ final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+ setupMockResponses(
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ },
+ response -> {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200);
+ when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+ });
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(
+ bigquery, null, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create());
+ dataService.insertAll(
+ ref,
+ rows,
+ insertIds,
+ BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+ TEST_BACKOFF,
+ new MockSleeper(),
+ InsertRetryPolicy.alwaysRetry(),
+ new ArrayList<>(),
+ ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+ false,
+ false,
+ false,
+ null);
+
+ verifyAllResponsesAreRead();
+
+ verifyWriteMetricWasSet("project", "dataset", "table", "ok", 2);
+ }
+
/** Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues. */
@Test
public void testInsertFailsGracefully() throws Exception {
@@ -1177,8 +1282,8 @@ public class BigQueryServicesImplTest {
HttpResponseException.Builder builder = mock(HttpResponseException.Builder.class);
IOException validException = new GoogleJsonResponseException(builder, error);
IOException invalidException = new IOException();
- assertEquals(info.getReason(), dataService.getErrorInfo(validException).getReason());
- assertNull(dataService.getErrorInfo(invalidException));
+ assertEquals(info.getReason(), DatasetServiceImpl.getErrorInfo(validException).getReason());
+ assertNull(DatasetServiceImpl.getErrorInfo(invalidException));
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 1e64e22..3895c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -107,18 +106,13 @@ public class BigQueryUtilTest {
anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class)))
.thenReturn(mockInsertAll);
- doAnswer(
- invocation -> {
- when(mockInsertAll.execute())
- .thenReturn(
- responses.get(0),
- responses
- .subList(1, responses.size())
- .toArray(new TableDataInsertAllResponse[responses.size() - 1]));
- return mockInsertAll;
- })
- .when(mockInsertAll)
- .setPrettyPrint(false);
+ when(mockInsertAll.setPrettyPrint(any())).thenReturn(mockInsertAll);
+ when(mockInsertAll.execute())
+ .thenReturn(
+ responses.get(0),
+ responses
+ .subList(1, responses.size())
+ .toArray(new TableDataInsertAllResponse[responses.size() - 1]));
}
private void verifyInsertAll(int expectedRetries) throws IOException {
@@ -209,24 +203,11 @@ public class BigQueryUtilTest {
ids.add("");
}
- long totalBytes = 0;
- try {
- totalBytes =
- datasetService.insertAll(
- ref,
- rows,
- ids,
- InsertRetryPolicy.alwaysRetry(),
- null,
- null,
- false,
- false,
- false,
- null);
- } finally {
- verifyInsertAll(5);
- // Each of the 25 rows has 1 byte for length and 30 bytes: '{"f":[{"v":"foo"},{"v":1234}]}'
- assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
- }
+ long totalBytes =
+ datasetService.insertAll(
+ ref, rows, ids, InsertRetryPolicy.alwaysRetry(), null, null, false, false, false, null);
+ verifyInsertAll(5);
+ // Each of the 25 rows has 1 byte for length and 30 bytes: '{"f":[{"v":"foo"},{"v":1234}]}'
+ assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
}
}