You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/22 22:58:26 UTC

[GitHub] [beam] pabloem opened a new pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

pabloem opened a new pull request #15067:
URL: https://github.com/apache/beam/pull/15067


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r734200008



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +963,72 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to use on error.
-            futures.add(
-                executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
-                              for (ErrorProto insertError : insertErrors.getErrors()) {
-                                serviceCallMetric.call(insertError.getReason());
-                              }
-                            }
-                          }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
-                          if (errorInfo == null) {
-                            serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
-                            throw e;
-                          }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying insertAll");
-                          }
-                        }
-                      }
-                    }));
-            strideIndices.add(strideIndex);
-
-            retTotalDataSize += dataSize;
-
-            dataSize = 0L;
-            strideIndex = i + 1;
-            rows = new ArrayList<>();
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            if (rows.size() == 0) {

Review comment:
       you're right. I've simplified the logic to match your pseudocode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r661773398



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -870,24 +870,36 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int j = 0;
+        TableRow row = rowsToPublish.get(j).getValue();
+        while (row != null) {
+          long rowSize;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            rowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            dataSize += rowSize;
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
+          // If this row fits within the buffer, then we include it in the buffer to be inserted
+          // afterwards.
+          if ((dataSize < maxRowBatchSize && rows.size() < maxRowsPerBatch)

Review comment:
       The logic is getting very complicated here.  It seems like to simplify it would make sense to write it as:
   ```
   while (nextRow) {
       if (this event will go beyond the limits) {
           flush();
       }
       rowsToAdd.add(row)
   }
   
   flush();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-950029970


   Run Java PostCommit
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-950030133


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-947885853


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r734088954



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +866,130 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to use on error.
-            futures.add(
-                executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
-                              for (ErrorProto insertError : insertErrors.getErrors()) {
-                                serviceCallMetric.call(insertError.getReason());
-                              }
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            continue;
+          }
+          // If the row fits into the insert buffer, then we add it to the buffer to be inserted
+          // later, and we move onto the next row.
+          if (nextRowSize + dataSize < maxRowBatchSize && rows.size() <= maxRowsPerBatch) {
+            TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+            if (idsToPublish != null) {
+              out.setInsertId(idsToPublish.get(rowIndex));
+            }
+            out.setJson(row.getUnknownKeys());
+            rows.add(out);
+            rowIndex++;
+            dataSize += nextRowSize;
+            // If we have not reached the last row, then we continue the loop to insert the rows
+            // that follow.
+            // If we have reached the last row, then we do not continue to the next loop iteration,
+            // and instead we allow us to continue down to attempt to insert it.
+            if (rowIndex <= rowsToPublish.size() - 1) {
+              continue;
+            }
+          }
+          // If the row does not fit into the insert buffer, then we take the current buffer,
+          // issue the insert call, and we retry adding the same row to the troublesome buffer.
+          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+          content.setRows(rows);
+          content.setSkipInvalidRows(skipInvalidRows);
+          content.setIgnoreUnknownValues(ignoreUnkownValues);
+
+          final Bigquery.Tabledata.InsertAll insert =
+              client
+                  .tabledata()
+                  .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
+                  .setPrettyPrint(false);
+
+          // Create final reference (which cannot change).
+          // So the lamba expression can refer to rowsInsertedForRequest to use on error.
+          futures.add(
+              executor.submit(
+                  () -> {
+                    // A backoff for rate limit exceeded errors.
+                    BackOff backoff1 =
+                        BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
+                    long totalBackoffMillis = 0L;
+                    while (true) {
+                      ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
+                      try {
+                        List<TableDataInsertAllResponse.InsertErrors> response =
+                            insert.execute().getInsertErrors();
+                        if (response == null || response.isEmpty()) {
+                          serviceCallMetric.call("ok");
+                        } else {
+                          for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
+                            for (ErrorProto insertError : insertErrors.getErrors()) {
+                              serviceCallMetric.call(insertError.getReason());
                             }
                           }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
-                          if (errorInfo == null) {
-                            serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+                        }
+                        return response;
+                      } catch (IOException e) {
+                        GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+                        if (errorInfo == null) {
+                          serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+                          throw e;
+                        }
+                        serviceCallMetric.call(errorInfo.getReason());
+                        /**
+                         * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+                         * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+                         * GoogleCloudDataproc/hadoop-connectors
+                         */
+                        if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+                            && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+                          throw e;
+                        }
+                        LOG.info(
+                            String.format(
+                                "BigQuery insertAll error, retrying: %s",
+                                ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+                        try {
+                          long nextBackOffMillis = backoff1.nextBackOffMillis();
+                          if (nextBackOffMillis == BackOff.STOP) {
                             throw e;
                           }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying insertAll");
-                          }
+                          sleeper.sleep(nextBackOffMillis);
+                          totalBackoffMillis += nextBackOffMillis;
+                          final long totalBackoffMillisSoFar = totalBackoffMillis;
+                          maxThrottlingMsec.getAndUpdate(
+                              current -> Math.max(current, totalBackoffMillisSoFar));
+                        } catch (InterruptedException interrupted) {
+                          throw new IOException(
+                              "Interrupted while waiting before retrying insertAll");
                         }
                       }
-                    }));
-            strideIndices.add(strideIndex);
+                    }
+                  }));
+          strideIndices.add(strideIndex);
 
-            retTotalDataSize += dataSize;
+          retTotalDataSize += dataSize;
 
-            dataSize = 0L;
-            strideIndex = i + 1;
-            rows = new ArrayList<>();
-          }
+          dataSize = 0L;
+          strideIndex = rowIndex + 1;

Review comment:
       ah great catch. I've added a couple unit tests, and fixed the issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r734112128



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +963,72 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to use on error.
-            futures.add(
-                executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
-                              for (ErrorProto insertError : insertErrors.getErrors()) {
-                                serviceCallMetric.call(insertError.getReason());
-                              }
-                            }
-                          }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
-                          if (errorInfo == null) {
-                            serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
-                            throw e;
-                          }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying insertAll");
-                          }
-                        }
-                      }
-                    }));
-            strideIndices.add(strideIndex);
-
-            retTotalDataSize += dataSize;
-
-            dataSize = 0L;
-            strideIndex = i + 1;
-            rows = new ArrayList<>();
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            if (rows.size() == 0) {

Review comment:
       Need to check that `rowIndex < rowsToPublish.size()` before continuing.  I'm also worried about the idea of enforcing this limit (which saves an error API call, but strictly enforces a limit which BigQuery might change or be able to increase.
   
   Perhaps the flow could be:
   
   ```
   while (nextRow) {
       if (nextRow is going to break our limits) {
           publishBatch()
       }
   
       addRow()
   }
   publishBatch()
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-947200048






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-867024889


   r: @dhercher 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-947372582


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-866564590


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-947353650


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #15067:
URL: https://github.com/apache/beam/pull/15067


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r733196365



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -865,104 +866,130 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int rowIndex = 0;
+        while (rowIndex < rowsToPublish.size()) {
+          TableRow row = rowsToPublish.get(rowIndex).getValue();
+          long nextRowSize = 0L;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            nextRowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
-            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-            content.setRows(rows);
-            content.setSkipInvalidRows(skipInvalidRows);
-            content.setIgnoreUnknownValues(ignoreUnkownValues);
-
-            final Bigquery.Tabledata.InsertAll insert =
-                client
-                    .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
-                    .setPrettyPrint(false);
-
-            // Create final reference (which cannot change).
-            // So the lamba expression can refer to rowsInsertedForRequest to use on error.
-            futures.add(
-                executor.submit(
-                    () -> {
-                      // A backoff for rate limit exceeded errors.
-                      BackOff backoff1 =
-                          BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
-                      long totalBackoffMillis = 0L;
-                      while (true) {
-                        ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
-                        try {
-                          List<TableDataInsertAllResponse.InsertErrors> response =
-                              insert.execute().getInsertErrors();
-                          if (response == null || response.isEmpty()) {
-                            serviceCallMetric.call("ok");
-                          } else {
-                            for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
-                              for (ErrorProto insertError : insertErrors.getErrors()) {
-                                serviceCallMetric.call(insertError.getReason());
-                              }
+          // The following scenario must be *extremely* rare.
+          // If this row's encoding by itself is larger than the maxRowBatchSize, then it's
+          // impossible to insert into BigQuery, and so we send it out through the dead-letter
+          // queue.
+          if (nextRowSize >= maxRowBatchSize) {
+            errorContainer.add(
+                failedInserts,
+                new InsertErrors()
+                    .setErrors(ImmutableList.of(new ErrorProto().setReason("row too large"))),
+                ref,
+                rowsToPublish.get(rowIndex));
+            rowIndex++;
+            continue;
+          }
+          // If the row fits into the insert buffer, then we add it to the buffer to be inserted
+          // later, and we move onto the next row.
+          if (nextRowSize + dataSize < maxRowBatchSize && rows.size() <= maxRowsPerBatch) {
+            TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+            if (idsToPublish != null) {
+              out.setInsertId(idsToPublish.get(rowIndex));
+            }
+            out.setJson(row.getUnknownKeys());
+            rows.add(out);
+            rowIndex++;
+            dataSize += nextRowSize;
+            // If we have not reached the last row, then we continue the loop to insert the rows
+            // that follow.
+            // If we have reached the last row, then we do not continue to the next loop iteration,
+            // and instead we allow us to continue down to attempt to insert it.
+            if (rowIndex <= rowsToPublish.size() - 1) {
+              continue;
+            }
+          }
+          // If the row does not fit into the insert buffer, then we take the current buffer,
+          // issue the insert call, and we retry adding the same row to the troublesome buffer.
+          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+          content.setRows(rows);
+          content.setSkipInvalidRows(skipInvalidRows);
+          content.setIgnoreUnknownValues(ignoreUnkownValues);
+
+          final Bigquery.Tabledata.InsertAll insert =
+              client
+                  .tabledata()
+                  .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
+                  .setPrettyPrint(false);
+
+          // Create final reference (which cannot change).
+          // So the lamba expression can refer to rowsInsertedForRequest to use on error.
+          futures.add(
+              executor.submit(
+                  () -> {
+                    // A backoff for rate limit exceeded errors.
+                    BackOff backoff1 =
+                        BackOffAdapter.toGcpBackOff(rateLimitBackoffFactory.backoff());
+                    long totalBackoffMillis = 0L;
+                    while (true) {
+                      ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
+                      try {
+                        List<TableDataInsertAllResponse.InsertErrors> response =
+                            insert.execute().getInsertErrors();
+                        if (response == null || response.isEmpty()) {
+                          serviceCallMetric.call("ok");
+                        } else {
+                          for (TableDataInsertAllResponse.InsertErrors insertErrors : response) {
+                            for (ErrorProto insertError : insertErrors.getErrors()) {
+                              serviceCallMetric.call(insertError.getReason());
                             }
                           }
-                          return response;
-                        } catch (IOException e) {
-                          GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
-                          if (errorInfo == null) {
-                            serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
-                            throw e;
-                          }
-                          serviceCallMetric.call(errorInfo.getReason());
-                          /**
-                           * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
-                           * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
-                           * GoogleCloudDataproc/hadoop-connectors
-                           */
-                          if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
-                              && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+                        }
+                        return response;
+                      } catch (IOException e) {
+                        GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e);
+                        if (errorInfo == null) {
+                          serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN);
+                          throw e;
+                        }
+                        serviceCallMetric.call(errorInfo.getReason());
+                        /**
+                         * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by
+                         * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of
+                         * GoogleCloudDataproc/hadoop-connectors
+                         */
+                        if (!ApiErrorExtractor.INSTANCE.rateLimited(e)
+                            && !errorInfo.getReason().equals(QUOTA_EXCEEDED)) {
+                          throw e;
+                        }
+                        LOG.info(
+                            String.format(
+                                "BigQuery insertAll error, retrying: %s",
+                                ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
+                        try {
+                          long nextBackOffMillis = backoff1.nextBackOffMillis();
+                          if (nextBackOffMillis == BackOff.STOP) {
                             throw e;
                           }
-                          LOG.info(
-                              String.format(
-                                  "BigQuery insertAll error, retrying: %s",
-                                  ApiErrorExtractor.INSTANCE.getErrorMessage(e)));
-                          try {
-                            long nextBackOffMillis = backoff1.nextBackOffMillis();
-                            if (nextBackOffMillis == BackOff.STOP) {
-                              throw e;
-                            }
-                            sleeper.sleep(nextBackOffMillis);
-                            totalBackoffMillis += nextBackOffMillis;
-                            final long totalBackoffMillisSoFar = totalBackoffMillis;
-                            maxThrottlingMsec.getAndUpdate(
-                                current -> Math.max(current, totalBackoffMillisSoFar));
-                          } catch (InterruptedException interrupted) {
-                            throw new IOException(
-                                "Interrupted while waiting before retrying insertAll");
-                          }
+                          sleeper.sleep(nextBackOffMillis);
+                          totalBackoffMillis += nextBackOffMillis;
+                          final long totalBackoffMillisSoFar = totalBackoffMillis;
+                          maxThrottlingMsec.getAndUpdate(
+                              current -> Math.max(current, totalBackoffMillisSoFar));
+                        } catch (InterruptedException interrupted) {
+                          throw new IOException(
+                              "Interrupted while waiting before retrying insertAll");
                         }
                       }
-                    }));
-            strideIndices.add(strideIndex);
+                    }
+                  }));
+          strideIndices.add(strideIndex);
 
-            retTotalDataSize += dataSize;
+          retTotalDataSize += dataSize;
 
-            dataSize = 0L;
-            strideIndex = i + 1;
-            rows = new ArrayList<>();
-          }
+          dataSize = 0L;
+          strideIndex = rowIndex + 1;

Review comment:
       should strideIndex = rowIndex here since the rowIndex would have already been incremented here if the row had been accounted for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-947885769


   failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem closed pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem closed pull request #15067:
URL: https://github.com/apache/beam/pull/15067


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r732327879



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -870,24 +870,36 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int j = 0;
+        TableRow row = rowsToPublish.get(j).getValue();
+        while (row != null) {
+          long rowSize;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            rowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            dataSize += rowSize;
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
+          // If this row fits within the buffer, then we include it in the buffer to be inserted
+          // afterwards.
+          if ((dataSize < maxRowBatchSize && rows.size() < maxRowsPerBatch)

Review comment:
       LMK what you think about this implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-949057068


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-949225151


   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #15067:
URL: https://github.com/apache/beam/pull/15067#issuecomment-866390993


   Run Java PostCommit
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org