You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/02 15:50:07 UTC

[GitHub] [beam] ahmedabu98 opened a new pull request, #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

ahmedabu98 opened a new pull request, #23012:
URL: https://github.com/apache/beam/pull/23012

   Streaming FILE_LOADS is currently not working with large loads. After the first load, it lags because `WaitForBQJobs` is only triggered once with `beam.Create([None])`. These changes move the wait into the finish_bundle of its respective step. 
   
   Note: Streaming with small loads is done in one step and thus was not getting stuck.
   
   Also adding tests that more accurately determine if streaming with batch loads is working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971097516


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   Waiting for jobs in a for loop is how the [current implementation](https://github.com/apache/beam/blob/2d4f61c93250fb41112a2535394e7328bc1fdf95/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L757) does it as well. Looks like this is because there's no [PendingJobManager](https://github.com/apache/beam/blob/935eeadd45a6100c68243fc5b266b20d26164173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L91) equivalent for the Python connector. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1243706730

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971107164


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   If we decide a job manager for this connector would be helpful, perhaps it could be added in a separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r967562341


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -353,9 +356,10 @@ def __init__(
     self._step_name = step_name
     self._load_job_project_id = load_job_project_id
 
-  def setup(self):
-    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)

Review Comment:
   Everywhere else in the file it's bq_wrapper, so I wanted to keep it consistent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971161911


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   Sounds good. Can you write that up as an improvement?
   
   this looks good to me otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r967563357


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   Hmmm I'm not sure I follow, what stands out as weird to you?
   
   FYI Java implementation is somewhat like this. Jobs are submitted and accumulated as they are pending. Then they are[ looped over and waited for in finishBundle](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L344).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r967510361


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -353,9 +356,10 @@ def __init__(
     self._step_name = step_name
     self._load_job_project_id = load_job_project_id
 
-  def setup(self):
-    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)

Review Comment:
   why the change between _bq_wrapper and bq_wrapper?



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   Do we want to loop over these waits? that seems somewhat out of pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1235751510

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @pabloem for label python.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1242411669

   # [Codecov](https://codecov.io/gh/apache/beam/pull/23012?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#23012](https://codecov.io/gh/apache/beam/pull/23012?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (50c50e9) into [master](https://codecov.io/gh/apache/beam/commit/b2a6f46fb21709cc1927ce1950a38a922dce0a35?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b2a6f46) will **decrease** coverage by `0.01%`.
   > The diff coverage is `76.47%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #23012      +/-   ##
   ==========================================
   - Coverage   73.59%   73.58%   -0.02%     
   ==========================================
     Files         716      716              
     Lines       95282    95319      +37     
   ==========================================
   + Hits        70127    70144      +17     
   - Misses      23859    23879      +20     
     Partials     1296     1296              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.40% <76.47%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/23012?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `87.09% <76.47%> (-0.62%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `92.00% <0.00%> (-3.46%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `93.00% <0.00%> (-1.00%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.28% <0.00%> (-0.75%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `82.87% <0.00%> (-0.14%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.59% <0.00%> (-0.13%)` | :arrow_down: |
   | [...thon/apache\_beam/ml/inference/pytorch\_inference.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3B5dG9yY2hfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
   | [...beam/runners/portability/expansion\_service\_main.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9leHBhbnNpb25fc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [...am/examples/inference/pytorch\_language\_modeling.py](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfbGFuZ3VhZ2VfbW9kZWxpbmcucHk=) | `0.00% <0.00%> (ø)` | |
   | ... and [9 more](https://codecov.io/gh/apache/beam/pull/23012/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1245709878

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1243843771

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1243184761

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1242591274

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971071562


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   In that one, we add all of the waits to a manager, and then we wait for the manager. In this, we wait directly. Ideally we can do that pattern here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey merged pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
johnjcasey merged PR #23012:
URL: https://github.com/apache/beam/pull/23012


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1242393914

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1235660443

   Waiting for #23011


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #23012:
URL: https://github.com/apache/beam/pull/23012#issuecomment-1245945225

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r971106105


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   The benefit of the PendingJobManager's [wait implementation](https://github.com/apache/beam/blob/935eeadd45a6100c68243fc5b266b20d26164173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java#L127) is that it waits for the jobs in parallel and checks on each of them. This way, one slow job does not block us from checking (and logging) the state of the other jobs.
   
   This is lost in the Python connector's wait implementation because we check the state of the jobs sequentially. Ultimately, however, it doesn't change the big picture behavior because file loads are atomic, ie. if one job fails, all the jobs in the bundle fail. We don't release jobs to the next step until all the jobs are finished waiting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org