You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/10/25 03:27:11 UTC

[beam] branch master updated: Fixing BigQueryIO request too big corner case for streaming inserts (#15067)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f08d1f  Fixing BigQueryIO request too big corner case for streaming inserts (#15067)
1f08d1f is described below

commit 1f08d1f3ddc2e7bc7341be4b29bdafaec18de9cc
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Sun Oct 24 20:25:09 2021 -0700

    Fixing BigQueryIO request too big corner case for streaming inserts (#15067)
    
    * Rewriting for readability
    
    * Address comments
    
    * Address comments
    
    * Addressing comments. Simplifying logic
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 248 +++++++++++++--------
 .../io/gcp/bigquery/BigQueryServicesImplTest.java  | 109 ++++++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java |  45 ++--
 3 files changed, 280 insertions(+), 122 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index a636b04..9f03dc4d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -53,6 +53,7 @@ import com.google.api.services.bigquery.model.QueryResponse;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.auth.Credentials;
@@ -811,6 +812,103 @@ class BigQueryServicesImpl implements BigQueryServices {
           ALWAYS_RETRY);
     }
 
+    static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {
+      private final TableReference ref;
+      private final Boolean skipInvalidRows;
+      private final Boolean ignoreUnkownValues;
+      private final Bigquery client;
+      private final FluentBackoff rateLimitBackoffFactory;
+      private final List<TableDataInsertAllRequest.Rows> rows;
+      private final AtomicLong maxThrottlingMsec;
+      private final Sleeper sleeper;
+
+      InsertBatchofRowsCallable(
+          TableReference ref,
+          Boolean skipInvalidRows,
+          Boolean ignoreUnknownValues,
+          Bigquery client,
+          FluentBackoff rateLimitBackoffFactory,
+          List<TableDataInsertAllRequest.Rows> rows,
+          AtomicLong maxThrottlingMsec,
+          Sleeper sleeper) {
+        this.ref = ref;
+        this.skipInvalidRows = skipInvalidRows;
+        this.ignoreUnkownValues = ignoreUnknownValues;
+        this.client = client;
+        this.rateLimitBackoffFactory = rateLimitBackoffFactory;
+        this.rows = rows;
+        this.maxThrottlingMsec = maxThrottlingMsec;
+        this.sleeper = sleeper;
+      }
+
+      @Override
+      public List<TableDataInsertAllResponse.InsertErrors> call() throws Exception {
+        TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+        content.setRows(rows);
+        content.setSkipInvalidRows(skipInvalidRows);
+        content.setIgnoreUnknownValues(ignoreUnkownValues);
+
+        final Bigquery.Tabledata.InsertAll insert =
+            client
+                .tabledata()
+                .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
+                .setPrettyPrint(false);
+
+        // A backoff for rate limit exceeded errors.
+        BackOff backoff1 = BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
+        long totalBackoffMillis = 0L;
+        while (true) {
+          ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
+          try {
+            List<TableDataInsertAllResponse.InsertErrors> response =
+                insert.execute().getInsertErrors();
+            if (response == null || response.isEmpty()) {
+              serviceCallMetric.call("ok");
+            } else {
+              for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
+                for (ErrorProto insertError : insertErrors.getErrors()) {
+                  serviceCallMetric.call(insertError.getReason());
+                }
+              }
+            }
+            return response;
+          } catch (IOException e) {
+            GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+            if (errorInfo == null) {
+              serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+              throw e;
+            }
+            serviceCallMetric.call(errorInfo.getReason());
+            /**
+             * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+             * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+             * GoogleCloudDataproc/hadoop-connectors
+             */
+            if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+                && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+              throw e;
+            }
+            LOG.info(
+                String.format(
+                    "BigQuery insertAll error, retrying: %s",
+                    ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+            try {
+              long nextBackOffMillis = backoff1.nextBackOffMillis();
+              if (nextBackOffMillis == BackOff.STOP) {
+                throw e;
+              }
+              sleeper.sleep(nextBackOffMillis);
+              totalBackoffMillis += nextBackOffMillis;
+              final long totalBackoffMillisSoFar = totalBackoffMillis;
+              maxThrottlingMsec.getAndUpdate(current -> Math.max(current, totalBackoffMillisSoFar));
+            } catch (InterruptedException interrupted) {
+              throw new IOException("Interrupted while waiting before retrying insertAll");
+            }
+          }
+        }
+      }
+    }
+
     @VisibleForTesting
     <T> long insertAll(
         TableReference ref,
@@ -865,104 +963,78 @@ class BigQueryServicesImpl implements BigQueryServices {
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to use on error.
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            continue;
+          }
+
+          if (nextRowSize + dataSize >= maxRowBatchSize || rows.size() + 1 > maxRowsPerBatch) {
+            // If the row does not fit into the insert buffer, then we take the current buffer,
+            // issue the insert call, and we retry adding the same row to the troublesome buffer.
+            // Add a future to insert the current batch into BQ.
             futures.add(
                 executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
-                              for (ErrorProto insertError : insertErrors.getErrors()) {
-                                serviceCallMetric.call(insertError.getReason());
-                              }
-                            }
-                          }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
-                          if (errorInfo == null) {
-                            serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
-                            throw e;
-                          }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying insertAll");
-                          }
-                        }
-                      }
-                    }));
+                    new InsertBatchofRowsCallable(
+                        ref,
+                        skipInvalidRows,
+                        ignoreUnkownValues,
+                        client,
+                        rateLimitBackoffFactory,
+                        rows,
+                        maxThrottlingMsec,
+                        sleeper)));
             strideIndices.add(strideIndex);
-
             retTotalDataSize += dataSize;
-
-            dataSize = 0L;
-            strideIndex = i + 1;
+            strideIndex = rowIndex;
             rows = new ArrayList<>();
+            dataSize = 0L;
+          }
+          // If the row fits into the insert buffer, then we add it to the buffer to be inserted
+          // later, and we move onto the next row.
+          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+          if (idsToPublish != null) {
+            out.setInsertId(idsToPublish.get(rowIndex));
           }
+          out.setJson(row.getUnknownKeys());
+          rows.add(out);
+          rowIndex++;
+          dataSize += nextRowSize;
+        }
+
+        if (rows.size() > 0) {
+          futures.add(
+              executor.submit(
+                  new InsertBatchofRowsCallable(
+                      ref,
+                      skipInvalidRows,
+                      ignoreUnkownValues,
+                      client,
+                      rateLimitBackoffFactory,
+                      rows,
+                      maxThrottlingMsec,
+                      sleeper)));
+          strideIndices.add(strideIndex);
+          retTotalDataSize += dataSize;
         }
 
         try {
@@ -1067,7 +1139,7 @@ class BigQueryServicesImpl implements BigQueryServices {
           successfulRows);
     }
 
-    protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
+    protected static GoogleJsonError.ErrorInfo getErrorInfo(IOException e) {
       if (!(e instanceof GoogleJsonResponseException)) {
         return null;
       }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index efcb5bd..f39aea5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -851,6 +851,111 @@ public class BigQueryServicesImplTest {
     verifyWriteMetricWasSet("project", "dataset", "table", "ok", 1);
   }
 
+  /** Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of rows per request. */
+  @Test
+  public void testInsertWithinRowCountLimits() throws Exception {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+        ImmutableList.of(
+            wrapValue(new TableRow().set("row", "a")),
+            wrapValue(new TableRow().set("row", "b")),
+            wrapValue(new TableRow().set("row", "c")));
+    List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+    final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+    setupMockResponses(
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+        },
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+        },
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+        });
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(
+            bigquery,
+            null,
+            PipelineOptionsFactory.fromArgs("--maxStreamingRowsToBatch=1").create());
+    dataService.insertAll(
+        ref,
+        rows,
+        insertIds,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        TEST_BACKOFF,
+        new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(),
+        null,
+        null,
+        false,
+        false,
+        false,
+        null);
+
+    verifyAllResponsesAreRead();
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "ok", 3);
+  }
+
+  /** Tests that {@link DatasetServiceImpl#insertAll} does not go over limit of rows per request. */
+  @Test
+  public void testInsertWithinRequestByteSizeLimits() throws Exception {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<FailsafeValueInSingleWindow<TableRow, TableRow>> rows =
+        ImmutableList.of(
+            wrapValue(new TableRow().set("row", "a")),
+            wrapValue(new TableRow().set("row", "b")),
+            wrapValue(new TableRow().set("row", "cdefghijklmnopqrstuvwxyz")));
+    List<String> insertIds = ImmutableList.of("a", "b", "c");
+
+    final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+    setupMockResponses(
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+        },
+        response -> {
+          when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+          when(response.getStatusCode()).thenReturn(200);
+          when(response.getContent()).thenReturn(toStream(allRowsSucceeded));
+        });
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(
+            bigquery, null, PipelineOptionsFactory.fromArgs("--maxStreamingBatchSize=15").create());
+    dataService.insertAll(
+        ref,
+        rows,
+        insertIds,
+        BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()),
+        TEST_BACKOFF,
+        new MockSleeper(),
+        InsertRetryPolicy.alwaysRetry(),
+        new ArrayList<>(),
+        ErrorContainer.TABLE_ROW_ERROR_CONTAINER,
+        false,
+        false,
+        false,
+        null);
+
+    verifyAllResponsesAreRead();
+
+    verifyWriteMetricWasSet("project", "dataset", "table", "ok", 2);
+  }
+
   /** Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues. */
   @Test
   public void testInsertFailsGracefully() throws Exception {
@@ -1177,8 +1282,8 @@ public class BigQueryServicesImplTest {
     HttpResponseException.Builder builder = mock(HttpResponseException.Builder.class);
     IOException validException = new GoogleJsonResponseException(builder, error);
     IOException invalidException = new IOException();
-    assertEquals(info.getReason(), dataService.getErrorInfo(validException).getReason());
-    assertNull(dataService.getErrorInfo(invalidException));
+    assertEquals(info.getReason(), DatasetServiceImpl.getErrorInfo(validException).getReason());
+    assertNull(DatasetServiceImpl.getErrorInfo(invalidException));
   }
 
   @Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 1e64e22..3895c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -107,18 +106,13 @@ public class BigQueryUtilTest {
             anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class)))
         .thenReturn(mockInsertAll);
 
-    doAnswer(
-            invocation -> {
-              when(mockInsertAll.execute())
-                  .thenReturn(
-                      responses.get(0),
-                      responses
-                          .subList(1, responses.size())
-                          .toArray(new TableDataInsertAllResponse[responses.size() - 1]));
-              return mockInsertAll;
-            })
-        .when(mockInsertAll)
-        .setPrettyPrint(false);
+    when(mockInsertAll.setPrettyPrint(any())).thenReturn(mockInsertAll);
+    when(mockInsertAll.execute())
+        .thenReturn(
+            responses.get(0),
+            responses
+                .subList(1, responses.size())
+                .toArray(new TableDataInsertAllResponse[responses.size() - 1]));
   }
 
   private void verifyInsertAll(int expectedRetries) throws IOException {
@@ -209,24 +203,11 @@ public class BigQueryUtilTest {
       ids.add("");
     }
 
-    long totalBytes = 0;
-    try {
-      totalBytes =
-          datasetService.insertAll(
-              ref,
-              rows,
-              ids,
-              InsertRetryPolicy.alwaysRetry(),
-              null,
-              null,
-              false,
-              false,
-              false,
-              null);
-    } finally {
-      verifyInsertAll(5);
-      // Each of the 25 rows has 1 byte for length and 30 bytes: '{"f":[{"v":"foo"},{"v":1234}]}'
-      assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
-    }
+    long totalBytes =
+        datasetService.insertAll(
+            ref, rows, ids, InsertRetryPolicy.alwaysRetry(), null, null, false, false, false, null);
+    verifyInsertAll(5);
+    // Each of the 25 rows has 1 byte for length and 30 bytes: '{"f":[{"v":"foo"},{"v":1234}]}'
+    assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
   }
 }