You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/13 20:21:33 UTC
[beam] branch master updated: Revert "Merge pull request #11673
from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f399e02 Revert "Merge pull request #11673 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"
new 485bd08 Merge pull request #11694 from pabloem/revertlabels
f399e02 is described below
commit f399e026c6d4324136c66c4658fdd9586b013e33
Author: Pablo Estrada <pa...@apache.org>
AuthorDate: Wed May 13 11:30:40 2020 -0700
Revert "Merge pull request #11673 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs"
This reverts commit 820f0f5c12146195ed80617763a354ddb75f0bc1.
---
sdks/python/apache_beam/io/gcp/bigquery.py | 22 +++++------------
.../apache_beam/io/gcp/bigquery_read_it_test.py | 5 +---
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 28 +++++++---------------
3 files changed, 15 insertions(+), 40 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 130b473..d56fe12 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -606,8 +606,7 @@ class _CustomBigQuerySource(BoundedSource):
coder=None,
use_standard_sql=False,
flatten_results=True,
- kms_key=None,
- bigquery_job_labels=None):
+ kms_key=None):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
@@ -635,7 +634,6 @@ class _CustomBigQuerySource(BoundedSource):
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options
- self.bigquery_job_labels = bigquery_job_labels or {}
def display_data(self):
return {
@@ -643,7 +641,6 @@ class _CustomBigQuerySource(BoundedSource):
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
- 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
}
def estimate_size(self):
@@ -670,8 +667,7 @@ class _CustomBigQuerySource(BoundedSource):
self.flatten_results,
job_id=uuid.uuid4().hex,
dry_run=True,
- kms_key=self.kms_key,
- job_labels=self.bigquery_job_labels)
+ kms_key=self.kms_key)
size = int(job.statistics.totalBytesProcessed)
return size
else:
@@ -740,8 +736,7 @@ class _CustomBigQuerySource(BoundedSource):
self.use_legacy_sql,
self.flatten_results,
job_id=uuid.uuid4().hex,
- kms_key=self.kms_key,
- job_labels=self.bigquery_job_labels)
+ kms_key=self.kms_key)
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
@@ -758,9 +753,8 @@ class _CustomBigQuerySource(BoundedSource):
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
- include_header=False,
- job_labels=self.bigquery_job_labels)
- bq.wait_for_bq_job(job_ref)
+ include_header=False)
+ bq.wait_for_bq_job(job_ref, max_retries=0)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
if isinstance(self.table_reference, vp.ValueProvider):
@@ -1595,11 +1589,7 @@ class ReadFromBigQuery(PTransform):
bucket where the extracted table should be written as a string or
a :class:`~apache_beam.options.value_provider.ValueProvider`. If
:data:`None`, then the temp_location parameter is used.
- bigquery_job_labels (dict): A dictionary with string labels to be passed
- to BigQuery export and query jobs created by this transform. See:
- https://cloud.google.com/bigquery/docs/reference/rest/v2/\
- Job#JobConfiguration
- """
+ """
def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
if gcs_location:
if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 361633a..98d1241 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -274,10 +274,7 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
with beam.Pipeline(argv=self.args) as p:
result = (
p | 'read' >> beam.io.ReadFromBigQuery(
- query=self.query,
- use_standard_sql=True,
- project=self.project,
- bigquery_job_labels={'owner': 'apache_beam_tests'}))
+ query=self.query, use_standard_sql=True, project=self.project))
assert_that(result, equal_to(self.get_expected_data()))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 3f50944..2b6c959 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -320,8 +320,7 @@ class BigQueryWrapper(object):
from_table_reference,
to_table_reference,
create_disposition=None,
- write_disposition=None,
- job_labels=None):
+ write_disposition=None):
reference = bigquery.JobReference()
reference.jobId = job_id
reference.projectId = project_id
@@ -334,9 +333,7 @@ class BigQueryWrapper(object):
sourceTable=from_table_reference,
createDisposition=create_disposition,
writeDisposition=write_disposition,
- ),
- labels=job_labels or {},
- ),
+ )),
jobReference=reference,
))
@@ -358,8 +355,7 @@ class BigQueryWrapper(object):
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
- source_format=None,
- job_labels=None):
+ source_format=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -376,9 +372,7 @@ class BigQueryWrapper(object):
sourceFormat=source_format,
useAvroLogicalTypes=True,
autodetect=schema == 'SCHEMA_AUTODETECT',
- **additional_load_parameters),
- labels=job_labels or {},
- ),
+ **additional_load_parameters)),
jobReference=reference,
))
response = self.client.jobs.Insert(request)
@@ -395,8 +389,7 @@ class BigQueryWrapper(object):
flatten_results,
job_id,
dry_run=False,
- kms_key=None,
- job_labels=None):
+ kms_key=None):
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
@@ -411,9 +404,7 @@ class BigQueryWrapper(object):
if not dry_run else None,
flattenResults=flatten_results,
destinationEncryptionConfiguration=bigquery.
- EncryptionConfiguration(kmsKeyName=kms_key)),
- labels=job_labels or {},
- ),
+ EncryptionConfiguration(kmsKeyName=kms_key))),
jobReference=reference))
response = self.client.jobs.Insert(request)
@@ -705,8 +696,7 @@ class BigQueryWrapper(object):
destination_format,
project=None,
include_header=True,
- compression=ExportCompression.NONE,
- job_labels=None):
+ compression=ExportCompression.NONE):
"""Starts a job to export data from BigQuery.
Returns:
@@ -724,9 +714,7 @@ class BigQueryWrapper(object):
printHeader=include_header,
destinationFormat=destination_format,
compression=compression,
- ),
- labels=job_labels or {},
- ),
+ )),
jobReference=job_reference,
))
response = self.client.jobs.Insert(request)