You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/11 21:33:15 UTC
[GitHub] [beam] ahmedabu98 opened a new pull request, #24979: Respect BigQuery insert byte size limits
ahmedabu98 opened a new pull request, #24979:
URL: https://github.com/apache/beam/pull/24979
Adding some logic to handle when a batch of rows exceeds BQ's insert API limit: https://cloud.google.com/bigquery/quotas#streaming_inserts
Fixes #20711
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] Abacn commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1622123104
will merge once tests passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403969
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
I overlooked this. but why does `_flush_batch` not check the row size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253219275
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ buffer_byte_size_with_row = (
+ row_byte_size if not self._rows_buffer[destination] else
+ row_byte_size + get_deep_size(self._rows_buffer[destination]))
Review Comment:
Done, PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253325017
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
The process function looks at one element at a time, so here we are returning the large row as a failed row (to the output PCollections). Other elements coming after such a failed row will have their own `process` calls
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] AnandInguva commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
AnandInguva commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1385794577
@ahmedabu98 once this is ready, I can review it. let me know
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440254093
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253479657
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
Opened #27363
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253295476
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False,
- max_retries=MAX_INSERT_RETRIES):
+ max_retries=MAX_INSERT_RETRIES,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
Review Comment:
In case users want to control the batch size of their inserts (e.g. to have smaller more frequent batch inserts). I'll add a check that this doesn't exceed the true maximum
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] bvolpato commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253183595
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ buffer_byte_size_with_row = (
+ row_byte_size if not self._rows_buffer[destination] else
+ row_byte_size + get_deep_size(self._rows_buffer[destination]))
Review Comment:
In case it wasn't clear, I'm talking about keeping a supporting list like
self._rows_buffer_bytes[destination]
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ buffer_byte_size_with_row = (
+ row_byte_size if not self._rows_buffer[destination] else
+ row_byte_size + get_deep_size(self._rows_buffer[destination]))
Review Comment:
In case it wasn't clear, I'm talking about keeping a supporting list like
```
self._rows_buffer_bytes[destination]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253423839
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
Can we open one issue to track this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1419631332
oops : ( I'm sorry @ahmedabu98 - what should we do about this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] bvolpato commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "bvolpato (via GitHub)" <gi...@apache.org>.
bvolpato commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253182086
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ buffer_byte_size_with_row = (
+ row_byte_size if not self._rows_buffer[destination] else
+ row_byte_size + get_deep_size(self._rows_buffer[destination]))
Review Comment:
Can't this be expensive?
We already had `get_deep_size` for every element before, perhaps we could also accumulate it here instead of doing `get_deep_size` on the entire array again?
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1526,41 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ buffer_byte_size_with_row = (
+ row_byte_size if not self._rows_buffer[destination] else
+ row_byte_size + get_deep_size(self._rows_buffer[destination]))
Review Comment:
Can't this be _potentially_ really expensive?
We already had `get_deep_size` for every element before, perhaps we could also accumulate it here instead of doing `get_deep_size` on the entire array again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253409820
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
Review Comment:
This would be true if we are using double slash, which acts as a `math.floor()` after division. But here we are only using one slash for division here which will result in a float when necessary (e.g. 500 / 1_000_000 = 0.0005).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] codecov[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1379520254
# [Codecov](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#24979](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (26d9662) into [master](https://codecov.io/gh/apache/beam/commit/3838528fde37262ecadf642791e3e0f3a57d7b25?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3838528) will **decrease** coverage by `17.82%`.
> The diff coverage is `20.00%`.
```diff
@@ Coverage Diff @@
## master #24979 +/- ##
===========================================
- Coverage 73.13% 55.30% -17.83%
===========================================
Files 735 735
Lines 98146 98152 +6
===========================================
- Hits 71777 54281 -17496
- Misses 25006 42508 +17502
Partials 1363 1363
```
| Flag | Coverage Δ | |
|---|---|---|
| python | `56.85% <20.00%> (-25.83%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `43.77% <20.00%> (-27.58%)` | :arrow_down: |
| [sdks/python/apache\_beam/io/utils.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vdXRpbHMucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [sdks/python/apache\_beam/utils/shared.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvc2hhcmVkLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [sdks/python/apache\_beam/tools/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdG9vbHMvX19pbml0X18ucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [sdks/python/apache\_beam/ml/inference/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL19faW5pdF9fLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...s/python/apache\_beam/typehints/testing/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3Rlc3RpbmcvX19pbml0X18ucHk=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [.../python/apache\_beam/testing/analyzers/constants.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9hbmFseXplcnMvY29uc3RhbnRzLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...python/apache\_beam/typehints/testing/strategies.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3Rlc3Rpbmcvc3RyYXRlZ2llcy5weQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...ache\_beam/coders/proto2\_coder\_test\_messages\_pb2.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL3Byb3RvMl9jb2Rlcl90ZXN0X21lc3NhZ2VzX3BiMi5weQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| [...beam/transforms/fully\_qualified\_named\_transform.py](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9mdWxseV9xdWFsaWZpZWRfbmFtZWRfdHJhbnNmb3JtLnB5) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
| ... and [271 more](https://codecov.io/gh/apache/beam/pull/24979?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1435656486
Reminder, please take a look at this pr: @pabloem @johnjcasey
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1439921064
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
R: @jrmccluskey for label python.
R: @chamikaramj for label io.
Available commands:
- `stop reviewer notifications` - opt out of the automated review tooling
- `remind me after tests pass` - tag the comment author after tests pass
- `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253280098
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False,
- max_retries=MAX_INSERT_RETRIES):
+ max_retries=MAX_INSERT_RETRIES,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
Review Comment:
why can max_insert_payload_size be changed since https://cloud.google.com/bigquery/quotas#streaming_inserts is always there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253421565
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
Yes that can trigger the same error if users batch their rows beforehand in a way that exceeds limits. However, I think handling this can be a separate effort as that is a different use case. This PR handles the more general use case of writing a PCollection of rows, which a few users were using when running into this issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253327565
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1325,7 +1333,8 @@ def __init__(
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False,
- max_retries=MAX_INSERT_RETRIES):
+ max_retries=MAX_INSERT_RETRIES,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] Abacn commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1622155218
PythonLint passed though the status not update to GitHub UI. Merging for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403969
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
I overlooked this. but why does `_flush_batch` not check the row size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1385935655
Run Python_Coverage PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1379573050
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1413643397
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
R: @pabloem for label python.
R: @johnjcasey for label io.
Available commands:
- `stop reviewer notifications` - opt out of the automated review tooling
- `remind me after tests pass` - tag the comment author after tests pass
- `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253403468
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
Review Comment:
this indicates max_insert_payload_size must be greater than 1_000_000. We should validate this when calling init.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253276340
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1414,6 +1424,7 @@ def __init__(
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
self._max_retries = max_retries
+ self.max_insert_payload_size = max_insert_payload_size
Review Comment:
self._max_insert_payload_size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1609594058
R: @AnandInguva
R: @Abacn
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] Abacn merged pull request #24979: Respect BigQuery insert byte size limits
Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #24979:
URL: https://github.com/apache/beam/pull/24979
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253291582
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
should these be added in _flush_batch? We should not return when one row exceeds the limit, right? We should put all the large rows as the failed rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253293350
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1414,6 +1424,7 @@ def __init__(
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
self._max_retries = max_retries
+ self.max_insert_payload_size = max_insert_payload_size
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440252815
Focusing on other work right now, will tag reviewers when i come back to this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1440252139
R: @ahmedabu98
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1613526799
R: @johnjcasey
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1387210511
retest this please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #24979: Respect BigQuery insert byte size limits
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24979:
URL: https://github.com/apache/beam/pull/24979#issuecomment-1380706833
Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
R: @AnandInguva for label python.
R: @Abacn for label io.
Available commands:
- `stop reviewer notifications` - opt out of the automated review tooling
- `remind me after tests pass` - tag the comment author after tests pass
- `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253410733
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
I don't think it needs to? we are already preparing batches with the right size before sending them to `_flush_batch`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253412346
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
when `with_batched_input` is True, do we need to check this size limit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253425065
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
Review Comment:
I see. Forgot that difference.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253810001
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
To be honest, I feel we should do this check inside `_flush_batch`, which is more efficient than checking each row and returning the one failed row one time. If we did this check inside `_flush_batch`, #27363 would not be needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] liferoad commented on a diff in pull request #24979: Respect BigQuery insert byte size limits
Posted by "liferoad (via GitHub)" <gi...@apache.org>.
liferoad commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253810001
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+ # maintain buffer byte size for each destination
+ self._destination_buffer_byte_size.setdefault(destination, 0)
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self.max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self.max_insert_payload_size / 1_000_000
+ error = (
Review Comment:
To be honest, I feel we should do this check inside `_flush_batch`, which is more efficient to check each row and return the one failed row one time. If we did this check inside `_flush_batch`, #27363 would not be needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org