You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/02 21:19:31 UTC

[beam] branch master updated: Merge pull request #17487 from Adding user-agent to GCS client in Python

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a3f3fa0ffd Merge pull request #17487 from Adding user-agent to GCS client in Python
1a3f3fa0ffd is described below

commit 1a3f3fa0ffd6516eb5990f74343c2557c3e8ba0a
Author: Pablo Estrada <pa...@users.noreply.github.com>
AuthorDate: Mon May 2 17:19:24 2022 -0400

    Merge pull request #17487 from Adding user-agent to GCS client in Python
    
    * Adding user-agent to GCS client in Python
    
    * Adding BQ user-agent and tests
    
    * fix lint
    
    * add comments
---
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 11 ++++++--
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 29 ++++++++++++++++++++++
 sdks/python/apache_beam/io/gcp/gcsio.py            |  6 ++++-
 sdks/python/apache_beam/io/gcp/gcsio_test.py       | 16 ++++++++++++
 4 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 41a2118f8a1..676755d90f9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -43,6 +43,7 @@ from typing import Union
 
 import fastavro
 
+import apache_beam
 from apache_beam import coders
 from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
@@ -69,6 +70,7 @@ try:
   from apitools.base.py.transfer import Upload
   from apitools.base.py.exceptions import HttpError, HttpForbiddenError
   from google.api_core.exceptions import ClientError, GoogleAPICallError
+  from google.api_core.client_info import ClientInfo
   from google.cloud import bigquery as gcp_bigquery
 except ImportError:
   gcp_bigquery = None
@@ -327,8 +329,13 @@ class BigQueryWrapper(object):
     self.client = client or bigquery.BigqueryV2(
         http=get_new_http(),
         credentials=auth.get_service_credentials(),
-        response_encoding='utf8')
-    self.gcp_bq_client = client or gcp_bigquery.Client()
+        response_encoding='utf8',
+        additional_http_headers={
+            "user-agent": "apache-beam-%s" % apache_beam.__version__
+        })
+    self.gcp_bq_client = client or gcp_bigquery.Client(
+        client_info=ClientInfo(
+            user_agent="apache-beam-%s" % apache_beam.__version__))
     self._unique_row_id = 0
     # For testing scenarios where we pass in a client we do not want a
     # randomized prefix for row IDs.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index 38d3998d907..c2d56f56de7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -220,6 +220,20 @@ class TestBigQueryWrapper(unittest.TestCase):
     wrapper._delete_dataset('', '')
     self.assertTrue(client.datasets.Delete.called)
 
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('google.cloud._http.JSONConnection.http')
+  def test_user_agent_insert_all(self, http_mock, patched_sleep):
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
+    try:
+      wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None)
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall insert_all_rows call fails
+      # soon after the BQ API is called.
+      pass
+    call = http_mock.request.mock_calls[-2]
+    self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])
+
   @mock.patch('time.sleep', return_value=None)
   def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
     client = mock.Mock()
@@ -242,6 +256,21 @@ class TestBigQueryWrapper(unittest.TestCase):
       wrapper.create_temporary_dataset('project_id', 'location')
     self.assertTrue(client.datasets.Get.called)
 
+  @mock.patch('time.sleep', return_value=None)
+  def test_user_agent_passed(self, sleep_mock):
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
+    request_mock = mock.Mock()
+    wrapper.client._http.request = request_mock
+    try:
+      wrapper.create_temporary_dataset('project_id', 'location')
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall create_dataset call fails
+      # soon after the BQ API is called.
+      pass
+    call = request_mock.mock_calls[-1]
+    self.assertIn('apache-beam-', call[2]['headers']['user-agent'])
+
   def test_get_or_create_dataset_created(self):
     client = mock.Mock()
     client.datasets.Get.side_effect = HttpError(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0caf4415247..86046e340d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -33,6 +33,7 @@ import time
 import traceback
 from itertools import islice
 
+import apache_beam
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.internal.metrics.metric import ServiceCallMetric
 from apache_beam.io.filesystemio import Downloader
@@ -155,7 +156,10 @@ class GcsIO(object):
           credentials=auth.get_service_credentials(),
           get_credentials=False,
           http=get_new_http(),
-          response_encoding='utf8')
+          response_encoding='utf8',
+          additional_http_headers={
+              "User-Agent": "apache-beam-%s" % apache_beam.__version__
+          })
     self.client = storage_client
     self._rewrite_cb = None
     self.bucket_to_project_number = {}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 2f867ffc528..73a7984f320 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -439,6 +439,22 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(
         gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
+  @mock.patch(
+      'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
+      wraps=lambda: None)
+  @mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
+  def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
+    client = gcsio.GcsIO()
+    try:
+      client.get_bucket('mabucket')
+    except:  # pylint: disable=bare-except
+      # Ignore errors. The errors come from the fact that we did not mock
+      # the response from the API, so the overall get_bucket call fails
+      # soon after the GCS API is called.
+      pass
+    call = get_new_http_mock.return_value.request.mock_calls[-2]
+    self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])
+
   @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest')
   def test_delete_batch(self, *unused_args):
     gcsio.BatchApiRequest = FakeBatchApiRequest