You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/09 22:17:41 UTC

[GitHub] [beam] johnjcasey commented on a diff in pull request #23012: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads

johnjcasey commented on code in PR #23012:
URL: https://github.com/apache/beam/pull/23012#discussion_r967510361


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -353,9 +356,10 @@ def __init__(
     self._step_name = step_name
     self._load_job_project_id = load_job_project_id
 
-  def setup(self):
-    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)

Review Comment:
   why the change between _bq_wrapper and bq_wrapper?



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -430,20 +434,36 @@ def process(self, element, schema_mod_job_name_prefix):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]

Review Comment:
   Do we want to loop over these waits? that seems somewhat out of pattern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org