You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/06 18:27:02 UTC
[beam] branch master updated: [BEAM-10317] Python - Update
BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48fa73e [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
new 1aa3eb5 Merge pull request #12084 from [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
48fa73e is described below
commit 48fa73e51b683148f102187c3a67278118cbb8ac
Author: Alex Amato <aj...@google.com>
AuthorDate: Wed Jun 24 17:51:30 2020 -0700
[BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
---
sdks/python/apache_beam/internal/http_client.py | 5 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 22 ++++--
.../apache_beam/io/gcp/bigquery_file_loads.py | 29 ++++++--
.../apache_beam/io/gcp/bigquery_io_metadata.py | 83 ++++++++++++++++++++++
.../io/gcp/bigquery_io_metadata_test.py | 75 +++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 26 +++++--
.../python/apache_beam/io/gcp/gce_metadata_util.py | 48 +++++++++++++
7 files changed, 272 insertions(+), 16 deletions(-)
diff --git a/sdks/python/apache_beam/internal/http_client.py b/sdks/python/apache_beam/internal/http_client.py
index 794085d..57d2feb 100644
--- a/sdks/python/apache_beam/internal/http_client.py
+++ b/sdks/python/apache_beam/internal/http_client.py
@@ -57,7 +57,7 @@ def proxy_info_from_environment_var(proxy_env_var):
return httplib2.proxy_info_from_url(proxy_url, method=proxy_protocol)
-def get_new_http():
+def get_new_http(timeout_secs=DEFAULT_HTTP_TIMEOUT_SECONDS):
"""Creates and returns a new httplib2.Http instance.
Returns:
@@ -69,5 +69,4 @@ def get_new_http():
proxy_info = proxy_info_from_environment_var(proxy_env_var)
break
# Use a non-infinite SSL timeout to avoid hangs during network flakiness.
- return httplib2.Http(
- proxy_info=proxy_info, timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)
+ return httplib2.Http(proxy_info=proxy_info, timeout=timeout_secs)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index ccaca4d..c6b6191 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -254,6 +254,7 @@ from apache_beam.io.avroio import _create_avro_source as create_avro_source
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.iobase import RangeTracker
@@ -637,6 +638,7 @@ class _CustomBigQuerySource(BoundedSource):
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options
+ self.bq_io_metadata = None # Populate in setup, as it may make an RPC
self.bigquery_job_labels = bigquery_job_labels or {}
self.use_json_exports = use_json_exports
@@ -649,9 +651,13 @@ class _CustomBigQuerySource(BoundedSource):
'use_legacy_sql': self.use_legacy_sql,
'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
'export_file_format': export_format,
+ 'launchesBigQueryJobs': DisplayDataItem(
+ True, label="This Dataflow job launches bigquery jobs."),
}
def estimate_size(self):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
bq = bigquery_tools.BigQueryWrapper()
if self.table_reference is not None:
table_ref = self.table_reference
@@ -676,7 +682,8 @@ class _CustomBigQuerySource(BoundedSource):
job_id=uuid.uuid4().hex,
dry_run=True,
kms_key=self.kms_key,
- job_labels=self.bigquery_job_labels)
+ job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+ self.bigquery_job_labels))
size = int(job.statistics.totalBytesProcessed)
return size
else:
@@ -747,6 +754,8 @@ class _CustomBigQuerySource(BoundedSource):
@check_accessible(['query'])
def _execute_query(self, bq):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
job = bq._start_query_job(
self._get_project(),
self.query.get(),
@@ -754,7 +763,8 @@ class _CustomBigQuerySource(BoundedSource):
self.flatten_results,
job_id=uuid.uuid4().hex,
kms_key=self.kms_key,
- job_labels=self.bigquery_job_labels)
+ job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+ self.bigquery_job_labels))
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
@@ -766,13 +776,17 @@ class _CustomBigQuerySource(BoundedSource):
bigquery.TableSchema instance, a list of FileMetadata instances
"""
job_id = uuid.uuid4().hex
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
+ job_labels = self.bq_io_metadata.add_additional_bq_job_labels(
+ self.bigquery_job_labels)
if self.use_json_exports:
job_ref = bq.perform_extract_job([self.gcs_location],
job_id,
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
- job_labels=self.bigquery_job_labels,
+ job_labels=job_labels,
include_header=False)
else:
job_ref = bq.perform_extract_job([self.gcs_location],
@@ -781,7 +795,7 @@ class _CustomBigQuerySource(BoundedSource):
bigquery_tools.FileFormat.AVRO,
project=self._get_project(),
include_header=False,
- job_labels=self.bigquery_job_labels,
+ job_labels=job_labels,
use_avro_logical_types=True)
bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index d039598..7543dca 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -42,9 +42,11 @@ import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import filesystems as fs
from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
+from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.window import GlobalWindows
_LOGGER = logging.getLogger(__name__)
@@ -327,10 +329,19 @@ class TriggerCopyJobs(beam.DoFn):
self.write_disposition = write_disposition
self.test_client = test_client
self._observed_tables = set()
+ self.bq_io_metadata = None
+
+ def display_data(self):
+ return {
+ 'launchesBigQueryJobs': DisplayDataItem(
+ True, label="This Dataflow job launches bigquery jobs.")
+ }
def start_bundle(self):
self._observed_tables = set()
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
def process(self, element, job_name_prefix=None):
destination = element[0]
@@ -380,13 +391,16 @@ class TriggerCopyJobs(beam.DoFn):
wait_for_job = False
write_disposition = 'WRITE_APPEND'
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
job_reference = self.bq_wrapper._insert_copy_job(
copy_to_reference.projectId,
copy_job_name,
copy_from_reference,
copy_to_reference,
create_disposition=self.create_disposition,
- write_disposition=write_disposition)
+ write_disposition=write_disposition,
+ job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
@@ -416,6 +430,7 @@ class TriggerLoadJobs(beam.DoFn):
self.temporary_tables = temporary_tables
self.additional_bq_parameters = additional_bq_parameters or {}
self.source_format = source_format
+ self.bq_io_metadata = None
if self.temporary_tables:
# If we are loading into temporary tables, we rely on the default create
# and write dispositions, which mean that a new table will be created.
@@ -428,14 +443,17 @@ class TriggerLoadJobs(beam.DoFn):
def display_data(self):
result = {
'create_disposition': str(self.create_disposition),
- 'write_disposition': str(self.write_disposition)
+ 'write_disposition': str(self.write_disposition),
}
result['schema'] = str(self.schema)
-
+ result['launchesBigQueryJobs'] = DisplayDataItem(
+ True, label="This Dataflow job launches bigquery jobs.")
return result
def start_bundle(self):
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
def process(self, element, load_job_name_prefix, *schema_side_inputs):
# Each load job is assumed to have files respecting these constraints:
@@ -493,6 +511,8 @@ class TriggerLoadJobs(beam.DoFn):
table_reference,
schema,
additional_parameters)
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
job_reference = self.bq_wrapper.perform_load_job(
table_reference,
files,
@@ -501,7 +521,8 @@ class TriggerLoadJobs(beam.DoFn):
write_disposition=self.write_disposition,
create_disposition=create_disposition,
additional_load_parameters=additional_parameters,
- source_format=self.source_format)
+ source_format=self.source_format,
+ job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
yield (destination, job_reference)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
new file mode 100644
index 0000000..c8c1fc2
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import re
+
+from apache_beam.io.gcp import gce_metadata_util
+
+_VALID_CLOUD_LABEL_PATTERN = re.compile(r'^[a-z0-9\_\-]{1,63}$')
+
+
+def _is_valid_cloud_label_value(label_value):
+ """Returns true if label_value is a valid cloud label string.
+
+ This function can return false in cases where the label value is valid.
+ However, it will not return true in a case where the lavel value is invalid.
+ This is because a stricter set of allowed characters is used in this
+ validator, because foreign language characters are not accepted.
+ Thus, this should not be used as a generic validator for all cloud labels.
+
+ See Also:
+ https://cloud.google.com/compute/docs/labeling-resources
+
+ Args:
+ label_value: The label value to validate.
+
+ Returns:
+ True if the label value is a valid
+ """
+ return _VALID_CLOUD_LABEL_PATTERN.match(label_value)
+
+
+def create_bigquery_io_metadata():
+ """Creates a BigQueryIOMetadata.
+
+ This will request metadata properly based on which runner is being used.
+ """
+ dataflow_job_id = gce_metadata_util.fetch_dataflow_job_id()
+ # If a dataflow_job id is returned on GCE metadata. Then it means
+ # This program is running on a Dataflow GCE VM.
+ is_dataflow_runner = bool(dataflow_job_id)
+ kwargs = {}
+ if is_dataflow_runner:
+ # Only use this label if it is validated already.
+ # As we do not want a bad label to fail the BQ job.
+ if _is_valid_cloud_label_value(dataflow_job_id):
+ kwargs['beam_job_id'] = dataflow_job_id
+ return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+ """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+ Do not construct directly, use the create_bigquery_io_metadata factory.
+ Which will request metadata properly based on which runner is being used.
+ """
+ def __init__(self, beam_job_id=None):
+ self.beam_job_id = beam_job_id
+
+ def add_additional_bq_job_labels(self, job_labels=None):
+ job_labels = job_labels or {}
+ if self.beam_job_id and 'beam_job_id' not in job_labels:
+ job_labels['beam_job_id'] = self.beam_job_id
+ return job_labels
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py
new file mode 100644
index 0000000..26ee669
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_metadata_test.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for bigquery_io_metadata."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+from apache_beam.io.gcp import bigquery_io_metadata
+
+
+class BigqueryIoMetadataTest(unittest.TestCase):
+ def test_is_valid_cloud_label_value(self):
+ # A dataflow job ID.
+ # Lowercase letters, numbers, underscores and hyphens are allowed.
+ test_str = '2020-06-29_15_26_09-12838749047888422749'
+ self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+ # At least one character.
+ test_str = '0'
+ self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+ # Up to 63 characters.
+ test_str = '0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij012'
+ self.assertTrue(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+ # Lowercase letters allowed
+ test_str = 'abcdefghijklmnopqrstuvwxyz'
+ for test_char in test_str:
+ self.assertTrue(
+ bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+ # Empty strings not allowed.
+ test_str = ''
+ self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+ # 64 or more characters not allowed.
+ test_str = (
+ '0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij0123')
+ self.assertFalse(bigquery_io_metadata._is_valid_cloud_label_value(test_str))
+
+ # Uppercase letters not allowed
+ test_str = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
+ for test_char in test_str:
+ self.assertFalse(
+ bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+ # Special characters besides hyphens are not allowed
+ test_str = '!@#$%^&*()+=[{]};:\'\"\\|,<.>?/`~'
+ for test_char in test_str:
+ self.assertFalse(
+ bigquery_io_metadata._is_valid_cloud_label_value(test_char))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 12c5dc0..504f530 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -51,6 +51,7 @@ from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.internal.http_client import get_new_http
from apache_beam.io.gcp import bigquery_avro_tools
+from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -697,7 +698,8 @@ class BigQueryWrapper(object):
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
- source_format=None):
+ source_format=None,
+ job_labels=None):
"""Starts a job to load data into BigQuery.
Returns:
@@ -712,7 +714,8 @@ class BigQueryWrapper(object):
create_disposition=create_disposition,
write_disposition=write_disposition,
additional_load_parameters=additional_load_parameters,
- source_format=source_format)
+ source_format=source_format,
+ job_labels=job_labels)
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
@@ -865,14 +868,21 @@ class BigQueryWrapper(object):
return created_table
def run_query(
- self, project_id, query, use_legacy_sql, flatten_results, dry_run=False):
+ self,
+ project_id,
+ query,
+ use_legacy_sql,
+ flatten_results,
+ dry_run=False,
+ job_labels=None):
job = self._start_query_job(
project_id,
query,
use_legacy_sql,
flatten_results,
job_id=uuid.uuid4().hex,
- dry_run=dry_run)
+ dry_run=dry_run,
+ job_labels=job_labels)
job_id = job.jobReference.jobId
location = job.jobReference.location
@@ -1064,6 +1074,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
self.use_legacy_sql = use_legacy_sql
self.flatten_results = flatten_results
self.kms_key = kms_key
+ self.bigquery_job_labels = {}
+ self.bq_io_metadata = None
if self.source.table_reference is not None:
# If table schema did not define a project we default to executing
@@ -1118,10 +1130,14 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
self.client.clean_up_temporary_dataset(self.executing_project)
def __iter__(self):
+ if not self.bq_io_metadata:
+ self.bq_io_metadata = create_bigquery_io_metadata()
for rows, schema in self.client.run_query(
project_id=self.executing_project, query=self.query,
use_legacy_sql=self.use_legacy_sql,
- flatten_results=self.flatten_results):
+ flatten_results=self.flatten_results,
+ job_labels=self.bq_io_metadata.add_additional_bq_job_labels(
+ self.bigquery_job_labels)):
if self.schema is None:
self.schema = schema
for row in rows:
diff --git a/sdks/python/apache_beam/io/gcp/gce_metadata_util.py b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py
new file mode 100644
index 0000000..40f1745
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/gce_metadata_util.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import requests
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def _fetch_metadata(key):
+ try:
+ headers = {"Metadata-Flavor": "Google"}
+ uri = BASE_METADATA_URL + key
+ resp = requests.get(uri, headers=headers, timeout=5) # 5 seconds.
+ if resp.status_code == 200:
+ return resp.text
+ except requests.exceptions.RequestException:
+ # Silently fail, may mean its running on a non DataflowRunner,
+ # in which case it's prefectly normal.
+ pass
+ return ""
+
+
+def _fetch_custom_gce_metadata(customMetadataKey):
+ return _fetch_metadata("instance/attributes/" + customMetadataKey)
+
+
+def fetch_dataflow_job_id():
+ return _fetch_custom_gce_metadata("job_id")