You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/20 00:14:00 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15067: Fixing BigQueryIO request too big corner case for streaming inserts

pabloem commented on a change in pull request #15067:
URL: https://github.com/apache/beam/pull/15067#discussion_r732327879



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -870,24 +870,36 @@ public void deleteDataset(String projectId, String datasetId)
         // Store the longest throttled time across all parallel threads
         final AtomicLong maxThrottlingMsec = new AtomicLong();
 
-        for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i).getValue();
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
-
+        int j = 0;
+        TableRow row = rowsToPublish.get(j).getValue();
+        while (row != null) {
+          long rowSize;
           try {
-            dataSize += TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            rowSize = TableRowJsonCoder.of().getEncodedElementByteSize(row);
+            dataSize += rowSize;
           } catch (Exception ex) {
             throw new RuntimeException("Failed to convert the row to JSON", ex);
           }
 
-          if (dataSize >= maxRowBatchSize
-              || rows.size() >= maxRowsPerBatch
-              || i == rowsToPublish.size() - 1) {
+          // If this row fits within the buffer, then we include it in the buffer to be inserted
+          // afterwards.
+          if ((dataSize < maxRowBatchSize && rows.size() < maxRowsPerBatch)

Review comment:
       LMK what you think about this implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org