You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/11 01:09:54 UTC

[GitHub] [beam] pabloem commented on a change in pull request #14113: [BEAM-11277] Respect schemaUpdateOptions during BigQuery load with temporary tables

pabloem commented on a change in pull request #14113:
URL: https://github.com/apache/beam/pull/14113#discussion_r591993536



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -313,6 +320,111 @@ def process(self, element, file_prefix, *schema_side_inputs):
       yield (destination, (file_path, file_size))
 
 
+class UpdateDestinationSchema(beam.DoFn):
+  """Update destination schema based on data that is about to be copied into it.
+
+  Unlike load and query jobs, BigQuery copy jobs do not support schema field
+  addition or relaxation on the destination table. This DoFn fills that gap by
+  updating the destination table schemas to be compatible with the data coming
+  from the source table so that schema field modification options are respected
+  regardless of whether data is loaded directly to the destination table or
+  loaded into temporary tables before being copied into the destination.
+
+  This tranform takes as input a (destination, job_reference) pair where the
+  job_reference refers to a completed load job into a temporary table.
+
+  This transform emits (destination, job_reference) pairs where the
+  job_reference refers to a submitted load job for performing the schema
+  modification. Note that the input and output job references are not the same.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  def __init__(
+      self,
+      write_disposition=None,
+      test_client=None,
+      additional_bq_parameters=None,
+      step_name=None):
+    self._test_client = test_client
+    self._write_disposition = write_disposition
+    self._additional_bq_parameters = additional_bq_parameters or {}
+    self._step_name = step_name
+
+  def setup(self):
+    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+    self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+
+  def process(self, element, schema_mod_job_name_prefix):
+    destination = element[0]
+    temp_table_load_job_reference = element[1]
+
+    if callable(self._additional_bq_parameters):
+      additional_parameters = self._additional_bq_parameters(destination)
+    elif isinstance(self._additional_bq_parameters, vp.ValueProvider):
+      additional_parameters = self._additional_bq_parameters.get()
+    else:
+      additional_parameters = self._additional_bq_parameters
+
+    # When writing to normal tables WRITE_TRUNCATE will overwrite the schema but
+    # when writing to a partition, care needs to be taken to update the schema
+    # even on WRITE_TRUNCATE.
+    if (self._write_disposition not in ('WRITE_TRUNCATE', 'WRITE_APPEND') or
+        not additional_parameters or
+        not additional_parameters.get("schemaUpdateOptions")):
+      # No need to modify schema of destination table
+      return
+
+    table_reference = bigquery_tools.parse_table_reference(destination)
+    if table_reference.projectId is None:
+      table_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+
+    try:
+      # Check if destination table exists
+      _ = self._bq_wrapper.get_table(
+          project_id=table_reference.projectId,
+          dataset_id=table_reference.datasetId,
+          table_id=table_reference.tableId)
+    except HttpError as exn:
+      if exn.status_code == 404:
+        # Destination table does not exist, so no need to modify its schema
+        # ahead of the copy jobs.
+        return
+      else:
+        raise
+
+    temp_table_load_job = self._bq_wrapper.get_job(
+        project=temp_table_load_job_reference.projectId,
+        job_id=temp_table_load_job_reference.jobId,
+        location=temp_table_load_job_reference.location)
+    temp_table_schema = temp_table_load_job.configuration.load.schema
+

Review comment:
       Does it make sense to compare the schema of the destination table with the schema of the temp table job? We'd save one load, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org