You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/14 19:18:02 UTC
[beam] branch master updated: [BEAM-9967] Adding support for BQ
labels on Query/Export jobs. (Roll forward)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2ddb9c0 [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward)
new f81f934 Merge pull request #11700 from [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll …
2ddb9c0 is described below
commit 2ddb9c024193aeeb3d94326fb2df30459c787a06
Author: Pablo Estrada <pa...@apache.org>
AuthorDate: Wed May 13 14:16:44 2020 -0700
[BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward)
---
sdks/python/apache_beam/io/gcp/bigquery.py | 22 ++++++++----
.../apache_beam/io/gcp/bigquery_read_it_test.py | 5 ++-
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 42 +++++++++++++++++-----
3 files changed, 54 insertions(+), 15 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d56fe12..130b473 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -606,7 +606,8 @@ class _CustomBigQuerySource(BoundedSource):
coder=None,
use_standard_sql=False,
flatten_results=True,
- kms_key=None):
+ kms_key=None,
+ bigquery_job_labels=None):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
@@ -634,6 +635,7 @@ class _CustomBigQuerySource(BoundedSource):
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options
+ self.bigquery_job_labels = bigquery_job_labels or {}
def display_data(self):
return {
@@ -641,6 +643,7 @@ class _CustomBigQuerySource(BoundedSource):
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
+ 'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
}
def estimate_size(self):
@@ -667,7 +670,8 @@ class _CustomBigQuerySource(BoundedSource):
self.flatten_results,
job_id=uuid.uuid4().hex,
dry_run=True,
- kms_key=self.kms_key)
+ kms_key=self.kms_key,
+ job_labels=self.bigquery_job_labels)
size = int(job.statistics.totalBytesProcessed)
return size
else:
@@ -736,7 +740,8 @@ class _CustomBigQuerySource(BoundedSource):
self.use_legacy_sql,
self.flatten_results,
job_id=uuid.uuid4().hex,
- kms_key=self.kms_key)
+ kms_key=self.kms_key,
+ job_labels=self.bigquery_job_labels)
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
@@ -753,8 +758,9 @@ class _CustomBigQuerySource(BoundedSource):
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
- include_header=False)
- bq.wait_for_bq_job(job_ref, max_retries=0)
+ include_header=False,
+ job_labels=self.bigquery_job_labels)
+ bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
if isinstance(self.table_reference, vp.ValueProvider):
@@ -1589,7 +1595,11 @@ class ReadFromBigQuery(PTransform):
bucket where the extracted table should be written as a string or
a :class:`~apache_beam.options.value_provider.ValueProvider`. If
:data:`None`, then the temp_location parameter is used.
- """
+ bigquery_job_labels (dict): A dictionary with string labels to be passed
+ to BigQuery export and query jobs created by this transform. See:
+ https://cloud.google.com/bigquery/docs/reference/rest/v2/\
+ Job#JobConfiguration
+ """
def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
if gcs_location:
if not isinstance(gcs_location, (str, unicode, ValueProvider)):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 98d1241..50ebcdc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -274,7 +274,10 @@ class ReadNewTypesTests(BigQueryReadIntegrationTests):
with beam.Pipeline(argv=self.args) as p:
result = (
p | 'read' >> beam.io.ReadFromBigQuery(
- query=self.query, use_standard_sql=True, project=self.project))
+ query=self.query,
+ use_standard_sql=True,
+ project=self.project,
+ bigquery_job_labels={'launcher': 'apache_beam_tests'}))
assert_that(result, equal_to(self.get_expected_data()))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 2b6c959..b036dc0 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -218,6 +218,20 @@ def parse_table_reference(table, dataset=None, project=None):
# BigQueryWrapper.
+def _build_job_labels(input_labels):
+ """Builds job label protobuf structure."""
+ input_labels = input_labels or {}
+ result = bigquery.JobConfiguration.LabelsValue()
+
+ for k, v in input_labels.items():
+ result.additionalProperties.append(
+ bigquery.JobConfiguration.LabelsValue.AdditionalProperty(
+ key=k,
+ value=v,
+ ))
+ return result
+
+
class BigQueryWrapper(object):
"""BigQuery client wrapper with utilities for querying.
@@ -320,7 +334,8 @@ class BigQueryWrapper(object):
from_table_reference,
to_table_reference,
create_disposition=None,
- write_disposition=None):
+ write_disposition=None,
+ job_labels=None):
reference = bigquery.JobReference()
reference.jobId = job_id
reference.projectId = project_id
@@ -333,7 +348,9 @@ class BigQueryWrapper(object):
sourceTable=from_table_reference,
createDisposition=create_disposition,
writeDisposition=write_disposition,
- )),
+ ),
+ labels=_build_job_labels(job_labels),
+ ),
jobReference=reference,
))
@@ -355,7 +372,8 @@ class BigQueryWrapper(object):
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
- source_format=None):
+ source_format=None,
+ job_labels=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
@@ -372,7 +390,9 @@ class BigQueryWrapper(object):
sourceFormat=source_format,
useAvroLogicalTypes=True,
autodetect=schema == 'SCHEMA_AUTODETECT',
- **additional_load_parameters)),
+ **additional_load_parameters),
+ labels=_build_job_labels(job_labels),
+ ),
jobReference=reference,
))
response = self.client.jobs.Insert(request)
@@ -389,7 +409,8 @@ class BigQueryWrapper(object):
flatten_results,
job_id,
dry_run=False,
- kms_key=None):
+ kms_key=None,
+ job_labels=None):
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
@@ -404,7 +425,9 @@ class BigQueryWrapper(object):
if not dry_run else None,
flattenResults=flatten_results,
destinationEncryptionConfiguration=bigquery.
- EncryptionConfiguration(kmsKeyName=kms_key))),
+ EncryptionConfiguration(kmsKeyName=kms_key)),
+ labels=_build_job_labels(job_labels),
+ ),
jobReference=reference))
response = self.client.jobs.Insert(request)
@@ -696,7 +719,8 @@ class BigQueryWrapper(object):
destination_format,
project=None,
include_header=True,
- compression=ExportCompression.NONE):
+ compression=ExportCompression.NONE,
+ job_labels=None):
"""Starts a job to export data from BigQuery.
Returns:
@@ -714,7 +738,9 @@ class BigQueryWrapper(object):
printHeader=include_header,
destinationFormat=destination_format,
compression=compression,
- )),
+ ),
+ labels=_build_job_labels(job_labels),
+ ),
jobReference=job_reference,
))
response = self.client.jobs.Insert(request)