You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/14 19:18:02 UTC

[beam] branch master updated: [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ddb9c0  [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward)
     new f81f934  Merge pull request #11700 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll …
2ddb9c0 is described below

commit 2ddb9c024193aeeb3d94326fb2df30459c787a06
Author: Pablo Estrada <pa...@apache.org>
AuthorDate: Wed May 13 14:16:44 2020 -0700

    [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward)
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 22 ++++++++----
 .../apache_beam/io/gcp/bigquery_read_it_test.py    |  5 ++-
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 42 +++++++++++++++++-----
 3 files changed, 54 insertions(+), 15 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d56fe12..130b473 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -606,7 +606,8 @@ class _CustomBigQuerySource(BoundedSource):
       coder=None,
       use_standard_sql=False,
       flatten_results=True,
-      kms_key=None):
+      kms_key=None,
+      bigquery_job_labels=None):
     if table is not None and query is not None:
       raise ValueError(
           'Both a BigQuery table and a query were specified.'
@@ -634,6 +635,7 @@ class _CustomBigQuerySource(BoundedSource):
     self.kms_key = kms_key
     self.split_result = None
     self.options = pipeline_options
+    self.bigquery_job_labels = bigquery_job_labels or {}
 
   def display_data(self):
     return {
@@ -641,6 +643,7 @@ class _CustomBigQuerySource(BoundedSource):
         'query': str(self.query),
         'project': str(self.project),
         'use_legacy_sql': self.use_legacy_sql,
+        'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
     }
 
   def estimate_size(self):
@@ -667,7 +670,8 @@ class _CustomBigQuerySource(BoundedSource):
           self.flatten_results,
           job_id=uuid.uuid4().hex,
           dry_run=True,
-          kms_key=self.kms_key)
+          kms_key=self.kms_key,
+          job_labels=self.bigquery_job_labels)
       size = int(job.statistics.totalBytesProcessed)
       return size
     else:
@@ -736,7 +740,8 @@ class _CustomBigQuerySource(BoundedSource):
         self.use_legacy_sql,
         self.flatten_results,
         job_id=uuid.uuid4().hex,
-        kms_key=self.kms_key)
+        kms_key=self.kms_key,
+        job_labels=self.bigquery_job_labels)
     job_ref = job.jobReference
     bq.wait_for_bq_job(job_ref, max_retries=0)
     return bq._get_temp_table(self._get_project())
@@ -753,8 +758,9 @@ class _CustomBigQuerySource(BoundedSource):
                                      self.table_reference,
                                      bigquery_tools.FileFormat.JSON,
                                      project=self._get_project(),
-                                     include_header=False)
-    bq.wait_for_bq_job(job_ref, max_retries=0)
+                                     include_header=False,
+                                     job_labels=self.bigquery_job_labels)
+    bq.wait_for_bq_job(job_ref)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
 
     if isinstance(self.table_reference, vp.ValueProvider):
@@ -1589,7 +1595,11 @@ class ReadFromBigQuery(PTransform):
       bucket where the extracted table should be written as a string or
       a :class:`~apache_beam.options.value_provider.ValueProvider`. If
       :data:`None`, then the temp_location parameter is used.
-   """
+    bigquery_job_labels (dict): A dictionary with string labels to be passed
+      to BigQuery export and query jobs created by this transform. See:
+      https://cloud.google.com/bigquery/docs/reference/rest/v2/\
+              Job#JobConfiguration
+  """
   def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
     if gcs_location:
       if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 98d1241..50ebcdc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -274,7 +274,10 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
     with beam.Pipeline(argv=self.args) as p:
       result = (
           p | 'read' >> beam.io.ReadFromBigQuery(
-              query=self.query, use_standard_sql=True, project=self.project))
+              query=self.query,
+              use_standard_sql=True,
+              project=self.project,
+              bigquery_job_labels={'launcher': 'apache_beam_tests'}))
       assert_that(result, equal_to(self.get_expected_data()))
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 2b6c959..b036dc0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -218,6 +218,20 @@ def parse_table_reference(table, dataset=None, project=None):
 # BigQueryWrapper.
 
 
+def _build_job_labels(input_labels):
+  """Builds job label protobuf structure."""
+  input_labels = input_labels or {}
+  result = bigquery.JobConfiguration.LabelsValue()
+
+  for k, v in input_labels.items():
+    result.additionalProperties.append(
+        bigquery.JobConfiguration.LabelsValue.AdditionalProperty(
+            key=k,
+            value=v,
+        ))
+  return result
+
+
 class BigQueryWrapper(object):
   """BigQuery client wrapper with utilities for querying.
 
@@ -320,7 +334,8 @@ class BigQueryWrapper(object):
       from_table_reference,
       to_table_reference,
       create_disposition=None,
-      write_disposition=None):
+      write_disposition=None,
+      job_labels=None):
     reference = bigquery.JobReference()
     reference.jobId = job_id
     reference.projectId = project_id
@@ -333,7 +348,9 @@ class BigQueryWrapper(object):
                     sourceTable=from_table_reference,
                     createDisposition=create_disposition,
                     writeDisposition=write_disposition,
-                )),
+                ),
+                labels=_build_job_labels(job_labels),
+            ),
             jobReference=reference,
         ))
 
@@ -355,7 +372,8 @@ class BigQueryWrapper(object):
       write_disposition=None,
       create_disposition=None,
       additional_load_parameters=None,
-      source_format=None):
+      source_format=None,
+      job_labels=None):
     additional_load_parameters = additional_load_parameters or {}
     job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -372,7 +390,9 @@ class BigQueryWrapper(object):
                     sourceFormat=source_format,
                     useAvroLogicalTypes=True,
                     autodetect=schema == 'SCHEMA_AUTODETECT',
-                    **additional_load_parameters)),
+                    **additional_load_parameters),
+                labels=_build_job_labels(job_labels),
+            ),
             jobReference=reference,
         ))
     response = self.client.jobs.Insert(request)
@@ -389,7 +409,8 @@ class BigQueryWrapper(object):
       flatten_results,
       job_id,
       dry_run=False,
-      kms_key=None):
+      kms_key=None,
+      job_labels=None):
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
     request = bigquery.BigqueryJobsInsertRequest(
         projectId=project_id,
@@ -404,7 +425,9 @@ class BigQueryWrapper(object):
                     if not dry_run else None,
                     flattenResults=flatten_results,
                     destinationEncryptionConfiguration=bigquery.
-                    EncryptionConfiguration(kmsKeyName=kms_key))),
+                    EncryptionConfiguration(kmsKeyName=kms_key)),
+                labels=_build_job_labels(job_labels),
+            ),
             jobReference=reference))
 
     response = self.client.jobs.Insert(request)
@@ -696,7 +719,8 @@ class BigQueryWrapper(object):
       destination_format,
       project=None,
       include_header=True,
-      compression=ExportCompression.NONE):
+      compression=ExportCompression.NONE,
+      job_labels=None):
     """Starts a job to export data from BigQuery.
 
     Returns:
@@ -714,7 +738,9 @@ class BigQueryWrapper(object):
                     printHeader=include_header,
                     destinationFormat=destination_format,
                     compression=compression,
-                )),
+                ),
+                labels=_build_job_labels(job_labels),
+            ),
             jobReference=job_reference,
         ))
     response = self.client.jobs.Insert(request)