You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/02 21:19:31 UTC
[beam] branch master updated: Merge pull request #17487 from Adding user-agent to GCS client in Python
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3f3fa0ffd Merge pull request #17487 from Adding user-agent to GCS client in Python
1a3f3fa0ffd is described below
commit 1a3f3fa0ffd6516eb5990f74343c2557c3e8ba0a
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon May 2 17:19:24 2022 -0400
Merge pull request #17487 from Adding user-agent to GCS client in Python
* Adding user-agent to GCS client in Python
* Adding BQ user-agent and tests
* fix lint
* add comments
---
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 11 ++++++--
.../apache_beam/io/gcp/bigquery_tools_test.py | 29 ++++++++++++++++++++++
sdks/python/apache_beam/io/gcp/gcsio.py | 6 ++++-
sdks/python/apache_beam/io/gcp/gcsio_test.py | 16 ++++++++++++
4 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 41a2118f8a1..676755d90f9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -43,6 +43,7 @@ from typing import Union
import fastavro
+import apache_beam
from apache_beam import coders
from apache_beam.internal.gcp import auth
from apache_beam.internal.gcp.json_value import from_json_value
@@ -69,6 +70,7 @@ try:
from apitools.base.py.transfer import Upload
from apitools.base.py.exceptions import HttpError, HttpForbiddenError
from google.api_core.exceptions import ClientError, GoogleAPICallError
+ from google.api_core.client_info import ClientInfo
from google.cloud import bigquery as gcp_bigquery
except ImportError:
gcp_bigquery = None
@@ -327,8 +329,13 @@ class BigQueryWrapper(object):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(),
- response_encoding='utf8')
- self.gcp_bq_client = client or gcp_bigquery.Client()
+ response_encoding='utf8',
+ additional_http_headers={
+ "user-agent": "apache-beam-%s" % apache_beam.__version__
+ })
+ self.gcp_bq_client = client or gcp_bigquery.Client(
+ client_info=ClientInfo(
+ user_agent="apache-beam-%s" % apache_beam.__version__))
self._unique_row_id = 0
# For testing scenarios where we pass in a client we do not want a
# randomized prefix for row IDs.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 38d3998d907..c2d56f56de7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -220,6 +220,20 @@ class TestBigQueryWrapper(unittest.TestCase):
wrapper._delete_dataset('', '')
self.assertTrue(client.datasets.Delete.called)
+ @mock.patch('time.sleep', return_value=None)
+ @mock.patch('google.cloud._http.JSONConnection.http')
+ def test_user_agent_insert_all(self, http_mock, patched_sleep):
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
+ try:
+ wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None)
+ except: # pylint: disable=bare-except
+ # Ignore errors. The errors come from the fact that we did not mock
+ # the response from the API, so the overall insert_all_rows call fails
+ # soon after the BQ API is called.
+ pass
+ call = http_mock.request.mock_calls[-2]
+ self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])
+
@mock.patch('time.sleep', return_value=None)
def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
client = mock.Mock()
@@ -242,6 +256,21 @@ class TestBigQueryWrapper(unittest.TestCase):
wrapper.create_temporary_dataset('project_id', 'location')
self.assertTrue(client.datasets.Get.called)
+ @mock.patch('time.sleep', return_value=None)
+ def test_user_agent_passed(self, sleep_mock):
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
+ request_mock = mock.Mock()
+ wrapper.client._http.request = request_mock
+ try:
+ wrapper.create_temporary_dataset('project_id', 'location')
+ except: # pylint: disable=bare-except
+ # Ignore errors. The errors come from the fact that we did not mock
+ # the response from the API, so the overall create_dataset call fails
+ # soon after the BQ API is called.
+ pass
+ call = request_mock.mock_calls[-1]
+ self.assertIn('apache-beam-', call[2]['headers']['user-agent'])
+
def test_get_or_create_dataset_created(self):
client = mock.Mock()
client.datasets.Get.side_effect = HttpError(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0caf4415247..86046e340d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -33,6 +33,7 @@ import time
import traceback
from itertools import islice
+import apache_beam
from apache_beam.internal.http_client import get_new_http
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.filesystemio import Downloader
@@ -155,7 +156,10 @@ class GcsIO(object):
credentials=auth.get_service_credentials(),
get_credentials=False,
http=get_new_http(),
- response_encoding='utf8')
+ response_encoding='utf8',
+ additional_http_headers={
+ "User-Agent": "apache-beam-%s" % apache_beam.__version__
+ })
self.client = storage_client
self._rewrite_cb = None
self.bucket_to_project_number = {}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 2f867ffc528..73a7984f320 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -439,6 +439,22 @@ class TestGCSIO(unittest.TestCase):
self.assertFalse(
gcsio.parse_gcs_path(file_name) in self.client.objects.files)
+ @mock.patch(
+ 'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
+ wraps=lambda: None)
+ @mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
+ def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
+ client = gcsio.GcsIO()
+ try:
+ client.get_bucket('mabucket')
+ except: # pylint: disable=bare-except
+ # Ignore errors. The errors come from the fact that we did not mock
+ # the response from the API, so the overall get_bucket call fails
+ # soon after the GCS API is called.
+ pass
+ call = get_new_http_mock.return_value.request.mock_calls[-2]
+ self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])
+
@mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
def test_delete_batch(self, *unused_args):
gcsio.BatchApiRequest = FakeBatchApiRequest