You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/13 20:21:33 UTC

[beam] branch master updated: Revert "Merge pull request #11673 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f399e02  Revert "Merge pull request #11673 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"
     new 485bd08  Merge pull request #11694 from pabloem/revertlabels
f399e02 is described below

commit f399e026c6d4324136c66c4658fdd9586b013e33
Author: Pablo Estrada <pa...@apache.org>
AuthorDate: Wed May 13 11:30:40 2020 -0700

    Revert "Merge pull request #11673 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"
    
    This reverts commit 820f0f5c12146195ed80617763a354ddb75f0bc1.
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 22 +++++------------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    |  5 +---
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 28 +++++++---------------
 3 files changed, 15 insertions(+), 40 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 130b473..d56fe12 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -606,8 +606,7 @@ class _CustomBigQuerySource(BoundedSource):
       coder=None,
       use_standard_sql=False,
       flatten_results=True,
-      kms_key=None,
-      bigquery_job_labels=None):
+      kms_key=None):
     if table is not None and query is not None:
       raise ValueError(
           'Both a BigQuery table and a query were specified.'
@@ -635,7 +634,6 @@ class _CustomBigQuerySource(BoundedSource):
     self.kms_key = kms_key
     self.split_result = None
     self.options = pipeline_options
-    self.bigquery_job_labels = bigquery_job_labels or {}
 
   def display_data(self):
     return {
@@ -643,7 +641,6 @@ class _CustomBigQuerySource(BoundedSource):
         'query': str(self.query),
         'project': str(self.project),
         'use_legacy_sql': self.use_legacy_sql,
-        'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
     }
 
   def estimate_size(self):
@@ -670,8 +667,7 @@ class _CustomBigQuerySource(BoundedSource):
           self.flatten_results,
           job_id=uuid.uuid4().hex,
           dry_run=True,
-          kms_key=self.kms_key,
-          job_labels=self.bigquery_job_labels)
+          kms_key=self.kms_key)
       size = int(job.statistics.totalBytesProcessed)
       return size
     else:
@@ -740,8 +736,7 @@ class _CustomBigQuerySource(BoundedSource):
         self.use_legacy_sql,
         self.flatten_results,
         job_id=uuid.uuid4().hex,
-        kms_key=self.kms_key,
-        job_labels=self.bigquery_job_labels)
+        kms_key=self.kms_key)
     job_ref = job.jobReference
     bq.wait_for_bq_job(job_ref, max_retries=0)
     return bq._get_temp_table(self._get_project())
@@ -758,9 +753,8 @@ class _CustomBigQuerySource(BoundedSource):
                                      self.table_reference,
                                      bigquery_tools.FileFormat.JSON,
                                      project=self._get_project(),
-                                     include_header=False,
-                                     job_labels=self.bigquery_job_labels)
-    bq.wait_for_bq_job(job_ref)
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref, max_retries=0)
     metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
 
     if isinstance(self.table_reference, vp.ValueProvider):
@@ -1595,11 +1589,7 @@ class ReadFromBigQuery(PTransform):
       bucket where the extracted table should be written as a string or
       a :class:`~apache_beam.options.value_provider.ValueProvider`. If
       :data:`None`, then the temp_location parameter is used.
-    bigquery_job_labels (dict): A dictionary with string labels to be passed
-      to BigQuery export and query jobs created by this transform. See:
-      https://cloud.google.com/bigquery/docs/reference/rest/v2/\
-              Job#JobConfiguration
-  """
+   """
   def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
     if gcs_location:
       if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 361633a..98d1241 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -274,10 +274,7 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
     with beam.Pipeline(argv=self.args) as p:
       result = (
           p | 'read' >> beam.io.ReadFromBigQuery(
-              query=self.query,
-              use_standard_sql=True,
-              project=self.project,
-              bigquery_job_labels={'owner': 'apache_beam_tests'}))
+              query=self.query, use_standard_sql=True, project=self.project))
       assert_that(result, equal_to(self.get_expected_data()))
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 3f50944..2b6c959 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -320,8 +320,7 @@ class BigQueryWrapper(object):
       from_table_reference,
       to_table_reference,
       create_disposition=None,
-      write_disposition=None,
-      job_labels=None):
+      write_disposition=None):
     reference = bigquery.JobReference()
     reference.jobId = job_id
     reference.projectId = project_id
@@ -334,9 +333,7 @@ class BigQueryWrapper(object):
                     sourceTable=from_table_reference,
                     createDisposition=create_disposition,
                     writeDisposition=write_disposition,
-                ),
-                labels=job_labels or {},
-            ),
+                )),
             jobReference=reference,
         ))
 
@@ -358,8 +355,7 @@ class BigQueryWrapper(object):
       write_disposition=None,
       create_disposition=None,
       additional_load_parameters=None,
-      source_format=None,
-      job_labels=None):
+      source_format=None):
     additional_load_parameters = additional_load_parameters or {}
     job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -376,9 +372,7 @@ class BigQueryWrapper(object):
                     sourceFormat=source_format,
                     useAvroLogicalTypes=True,
                     autodetect=schema == 'SCHEMA_AUTODETECT',
-                    **additional_load_parameters),
-                labels=job_labels or {},
-            ),
+                    **additional_load_parameters)),
             jobReference=reference,
         ))
     response = self.client.jobs.Insert(request)
@@ -395,8 +389,7 @@ class BigQueryWrapper(object):
       flatten_results,
       job_id,
       dry_run=False,
-      kms_key=None,
-      job_labels=None):
+      kms_key=None):
     reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
     request = bigquery.BigqueryJobsInsertRequest(
         projectId=project_id,
@@ -411,9 +404,7 @@ class BigQueryWrapper(object):
                     if not dry_run else None,
                     flattenResults=flatten_results,
                     destinationEncryptionConfiguration=bigquery.
-                    EncryptionConfiguration(kmsKeyName=kms_key)),
-                labels=job_labels or {},
-            ),
+                    EncryptionConfiguration(kmsKeyName=kms_key))),
             jobReference=reference))
 
     response = self.client.jobs.Insert(request)
@@ -705,8 +696,7 @@ class BigQueryWrapper(object):
       destination_format,
       project=None,
       include_header=True,
-      compression=ExportCompression.NONE,
-      job_labels=None):
+      compression=ExportCompression.NONE):
     """Starts a job to export data from BigQuery.
 
     Returns:
@@ -724,9 +714,7 @@ class BigQueryWrapper(object):
                     printHeader=include_header,
                     destinationFormat=destination_format,
                     compression=compression,
-                ),
-                labels=job_labels or {},
-            ),
+                )),
             jobReference=job_reference,
         ))
     response = self.client.jobs.Insert(request)