You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/11 21:33:15 UTC

[GitHub] [beam] ahmedabu98 opened a new pull request, #24979: Respect BigQuery insert byte size limits

ahmedabu98 opened a new pull request, #24979:
URL: https://github.com/apache/beam/pull/24979

   Adding some logic to handle when a batch of rows exceeds BQ's insert API limit: https://cloud.google.com/bigquery/quotas#streaming_inserts
   
   Fixes #20711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1622123104

   will merge once tests passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403969


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   I overlooked this. but why does `_flush_batch` not check the row size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253219275


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      buffer_byte_size_with_row = (
+          row_byte_size if not self._rows_buffer[destination] else
+          row_byte_size + get_deep_size(self._rows_buffer[destination]))

Review Comment:
   Done, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253325017


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   The process function looks at one element at a time, so here we are returning the large row as a failed row (to the output PCollections). Other elements coming after such a failed row will have their own `process` calls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AnandInguva commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
AnandInguva commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1385794577

   @ahmedabu98 once this is ready, I can review it. let me know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440254093

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253479657


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   Opened #27363



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253295476


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
       ignore_insert_ids=False,
       with_batched_input=False,
       ignore_unknown_columns=False,
-      max_retries=MAX_INSERT_RETRIES):
+      max_retries=MAX_INSERT_RETRIES,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):

Review Comment:
   In case users want to control the batch size of their inserts (e.g. to have smaller more frequent batch inserts). I'll add a check that this doesn't exceed the true maximum



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253183595


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      buffer_byte_size_with_row = (
+          row_byte_size if not self._rows_buffer[destination] else
+          row_byte_size + get_deep_size(self._rows_buffer[destination]))

Review Comment:
   In case it wasn't clear, I'm talking about keeping a supporting list like
   
   self._rows_buffer_bytes[destination]



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      buffer_byte_size_with_row = (
+          row_byte_size if not self._rows_buffer[destination] else
+          row_byte_size + get_deep_size(self._rows_buffer[destination]))

Review Comment:
   In case it wasn't clear, I'm talking about keeping a supporting list like
   
   ```
   self._rows_buffer_bytes[destination]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253423839


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   Can we open one issue to track this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1419631332

   oops : ( I'm sorry @ahmedabu98 - what should we do about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] bvolpato commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253182086


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      buffer_byte_size_with_row = (
+          row_byte_size if not self._rows_buffer[destination] else
+          row_byte_size + get_deep_size(self._rows_buffer[destination]))

Review Comment:
   Can't this be expensive?
   
   We already had `get_deep_size` for every element before, perhaps we could also accumulate it here instead of doing `get_deep_size` on the entire array again?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (
+            f"Received row with size {row_mb_size}MB that exceeds "
+            f"the maximum insert payload size set ({max_mb_size}MB).")
+        return [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0], error))),
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS,
+                GlobalWindows.windowed_value(
+                    (destination, row_and_insert_id[0])))
+        ]
+
+      buffer_byte_size_with_row = (
+          row_byte_size if not self._rows_buffer[destination] else
+          row_byte_size + get_deep_size(self._rows_buffer[destination]))

Review Comment:
   Can't this be _potentially_ really expensive?
   
   We already had `get_deep_size` for every element before, perhaps we could also accumulate it here instead of doing `get_deep_size` on the entire array again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253409820


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000

Review Comment:
   This would be true if we are using double slash, which acts as a `math.floor()` after division. But here we are only using one slash for division here which will result in a float when necessary (e.g. 500 / 1_000_000 = 0.0005).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1379520254

   # [Codecov](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#24979](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (26d9662) into [master](https://codecov.io/gh/apache/beam/commit/3838528fde37262ecadf642791e3e0f3a57d7b25?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3838528) will **decrease** coverage by `17.82%`.
   > The diff coverage is `20.00%`.
   
   ```diff
   @@             Coverage Diff             @@
   ##           master   #24979       +/-   ##
   ===========================================
   - Coverage   73.13%   55.30%   -17.83%     
   ===========================================
     Files         735      735               
     Lines       98146    98152        +6     
   ===========================================
   - Hits        71777    54281    -17496     
   - Misses      25006    42508    +17502     
     Partials     1363     1363               
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `56.85% <20.00%> (-25.83%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `43.77% <20.00%> (-27.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/utils.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vdXRpbHMucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/shared.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvc2hhcmVkLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [sdks/python/apache\_beam/tools/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdG9vbHMvX19pbml0X18ucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [sdks/python/apache\_beam/ml/inference/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL19faW5pdF9fLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...s/python/apache\_beam/typehints/testing/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3Rlc3RpbmcvX19pbml0X18ucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/analyzers/constants.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9hbmFseXplcnMvY29uc3RhbnRzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...python/apache\_beam/typehints/testing/strategies.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3Rlc3Rpbmcvc3RyYXRlZ2llcy5weQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache\_beam/coders/proto2\_coder\_test\_messages\_pb2.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL3Byb3RvMl9jb2Rlcl90ZXN0X21lc3NhZ2VzX3BiMi5weQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...beam/transforms/fully\_qualified\_named\_transform.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9mdWxseV9xdWFsaWZpZWRfbmFtZWRfdHJhbnNmb3JtLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [271 more](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1435656486

   Reminder, please take a look at this pr: @pabloem @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1439921064

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @jrmccluskey for label python.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253280098


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
       ignore_insert_ids=False,
       with_batched_input=False,
       ignore_unknown_columns=False,
-      max_retries=MAX_INSERT_RETRIES):
+      max_retries=MAX_INSERT_RETRIES,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):

Review Comment:
   why can max_insert_payload_size be changed since https://cloud.google.com/bigquery/quotas#streaming_inserts is always there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253421565


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   Yes that can trigger the same error if users batch their rows beforehand in a way that exceeds limits. However, I think handling this can be a separate effort as that is a different use case. This PR handles the more general use case of writing a PCollection of rows, which a few users were using when running into this issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253327565


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
       ignore_insert_ids=False,
       with_batched_input=False,
       ignore_unknown_columns=False,
-      max_retries=MAX_INSERT_RETRIES):
+      max_retries=MAX_INSERT_RETRIES,
+      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1622155218

   PythonLint passed though the status not update to GitHub UI. Merging for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403969


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   I overlooked this. but why does `_flush_batch` not check the row size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1385935655

   Run Python_Coverage PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1379573050

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1413643397

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @pabloem for label python.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403468


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000

Review Comment:
   this indicates max_insert_payload_size must be greater than 1_000_000. We should validate this when calling init. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253276340


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1414,6 +1424,7 @@ def __init__(
         BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
     self.ignore_unknown_columns = ignore_unknown_columns
     self._max_retries = max_retries
+    self.max_insert_payload_size = max_insert_payload_size

Review Comment:
   self._max_insert_payload_size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1609594058

   R: @AnandInguva 
   R: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #24979: Respect BigQuery insert byte size limits

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #24979:
URL: https://github.com/apache/beam/pull/24979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253291582


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   should these be added in _flush_batch? We should not return when one row exceeds the limit, right? We should put all the large rows as the failed rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253293350


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1414,6 +1424,7 @@ def __init__(
         BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
     self.ignore_unknown_columns = ignore_unknown_columns
     self._max_retries = max_retries
+    self.max_insert_payload_size = max_insert_payload_size

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440252815

   Focusing on other work right now, will tag reviewers when i come back to this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440252139

   R: @ahmedabu98


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1613526799

   R: @johnjcasey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1387210511

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1380706833

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @AnandInguva for label python.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253410733


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   I don't think it needs to? we are already preparing batches with the right size before sending them to `_flush_batch`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253412346


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   when `with_batched_input` is True, do we need to check this size limit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253425065


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000

Review Comment:
   I see. Forgot that difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253810001


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   To be honest, I feel we should do this check inside `_flush_batch`, which is more efficient than checking each row and returning the one failed row one time. If we did this check inside `_flush_batch`, #27363 would not be needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits

Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253810001


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   To be honest, I feel we should do this check inside `_flush_batch`, which is more efficient to check each row and return the one failed row one time. If we did this check inside `_flush_batch`, #27363 would not be needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org