You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/13 20:28:54 UTC

[beam] branch release-2.39.0 updated: [BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 10aeab88c12 [BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)
10aeab88c12 is described below

commit 10aeab88c1260904df5adf62ac7d958872e581d1
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Fri May 13 13:28:47 2022 -0700

    [BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)
    
    Co-authored-by: Ryan Thompson <ry...@gmail.com>
---
 .../apache_beam/examples/wordcount_it_test.py      | 39 +++++++++++++
 sdks/python/apache_beam/internal/gcp/auth.py       | 66 ++++++++++++++++------
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   |  4 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    | 31 ++++++----
 .../apache_beam/io/gcp/gcsfilesystem_test.py       | 26 ++++-----
 sdks/python/apache_beam/io/gcp/gcsio.py            |  9 +--
 sdks/python/apache_beam/io/gcp/gcsio_test.py       |  2 +-
 .../python/apache_beam/options/pipeline_options.py | 14 ++++-
 .../runners/dataflow/internal/apiclient.py         |  6 +-
 .../apache_beam/runners/interactive/utils.py       |  2 +-
 .../runners/portability/sdk_container_builder.py   |  2 +-
 11 files changed, 145 insertions(+), 56 deletions(-)

diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index be8bbbfed8a..afbe70b9d23 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -28,6 +28,7 @@ import pytest
 from hamcrest.core.core.allof import all_of
 
 from apache_beam.examples import wordcount
+from apache_beam.internal.gcp import auth
 from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
 from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
@@ -47,6 +48,44 @@ class WordCountIT(unittest.TestCase):
   def test_wordcount_it(self):
     self._run_wordcount_it(wordcount.run)
 
+  @pytest.mark.it_postcommit
+  @pytest.mark.sickbay_direct
+  @pytest.mark.sickbay_spark
+  @pytest.mark.sickbay_flink
+  def test_wordcount_impersonation_it(self):
+    """Tests impersonation on dataflow.
+
+    For testing impersonation, we use three ingredients:
+    - a principal to impersonate
+    - a dataflow service account that only that principal is
+      allowed to launch jobs as
+    - a temp root that only the above two accounts have access to
+
+    Jenkins and Dataflow workers both run as GCE default service account.
+    So we remove that account from all the above.
+    """
+    # Credentials need to be reset or this test will fail and credentials
+    # from a previous test will be used.
+    auth._Credentials._credentials_init = False
+
+    ACCOUNT_TO_IMPERSONATE = (
+        'allows-impersonation@apache-'
+        'beam-testing.iam.gserviceaccount.com')
+    RUNNER_ACCOUNT = (
+        'impersonation-dataflow-worker@'
+        'apache-beam-testing.iam.gserviceaccount.com')
+    TEMP_DIR = 'gs://impersonation-test-bucket/temp-it'
+    STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it'
+    extra_options = {
+        'impersonate_service_account': ACCOUNT_TO_IMPERSONATE,
+        'service_account_email': RUNNER_ACCOUNT,
+        'temp_location': TEMP_DIR,
+        'staging_location': STAGING_LOCATION
+    }
+    self._run_wordcount_it(wordcount.run, **extra_options)
+    # Reset credentials for future tests.
+    auth._Credentials._credentials_init = False
+
   @pytest.mark.it_postcommit
   @pytest.mark.it_validatescontainer
   def test_wordcount_fnapi_it(self):
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 439264a9794..27a3c40cd4b 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -23,8 +23,12 @@ import logging
 import socket
 import threading
 
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
 # google.auth is only available when Beam is installed with the gcp extra.
 try:
+  from google.auth import impersonated_credentials
   import google.auth
   import google_auth_httplib2
   _GOOGLE_AUTH_AVAILABLE = True
@@ -40,6 +44,16 @@ executing_project = None
 
 _LOGGER = logging.getLogger(__name__)
 
+CLIENT_SCOPES = [
+    'https://www.googleapis.com/auth/bigquery',
+    'https://www.googleapis.com/auth/cloud-platform',
+    'https://www.googleapis.com/auth/devstorage.full_control',
+    'https://www.googleapis.com/auth/userinfo.email',
+    'https://www.googleapis.com/auth/datastore',
+    'https://www.googleapis.com/auth/spanner.admin',
+    'https://www.googleapis.com/auth/spanner.data'
+]
+
 
 def set_running_in_gce(worker_executing_project):
   """For internal use only; no backwards-compatibility guarantees.
@@ -59,16 +73,19 @@ def set_running_in_gce(worker_executing_project):
   executing_project = worker_executing_project
 
 
-def get_service_credentials():
+def get_service_credentials(pipeline_options):
   """For internal use only; no backwards-compatibility guarantees.
 
   Get credentials to access Google services.
+  Args:
+    pipeline_options: Pipeline options, used in creating credentials
+      like impersonated credentials.
 
   Returns:
     A ``google.auth.credentials.Credentials`` object or None if credentials
     not found. Returned object is thread-safe.
   """
-  return _Credentials.get_service_credentials()
+  return _Credentials.get_service_credentials(pipeline_options)
 
 
 if _GOOGLE_AUTH_AVAILABLE:
@@ -108,10 +125,7 @@ class _Credentials(object):
   _credentials = None
 
   @classmethod
-  def get_service_credentials(cls):
-    if cls._credentials_init:
-      return cls._credentials
-
+  def get_service_credentials(cls, pipeline_options):
     with cls._credentials_lock:
       if cls._credentials_init:
         return cls._credentials
@@ -124,13 +138,13 @@ class _Credentials(object):
       _LOGGER.info(
           "socket default timeout is %s seconds.", socket.getdefaulttimeout())
 
-      cls._credentials = cls._get_service_credentials()
+      cls._credentials = cls._get_service_credentials(pipeline_options)
       cls._credentials_init = True
 
     return cls._credentials
 
   @staticmethod
-  def _get_service_credentials():
+  def _get_service_credentials(pipeline_options):
     if not _GOOGLE_AUTH_AVAILABLE:
       _LOGGER.warning(
           'Unable to find default credentials because the google-auth library '
@@ -138,17 +152,10 @@ class _Credentials(object):
           'Google default credentials. Connecting anonymously.')
       return None
 
-    client_scopes = [
-        'https://www.googleapis.com/auth/bigquery',
-        'https://www.googleapis.com/auth/cloud-platform',
-        'https://www.googleapis.com/auth/devstorage.full_control',
-        'https://www.googleapis.com/auth/userinfo.email',
-        'https://www.googleapis.com/auth/datastore',
-        'https://www.googleapis.com/auth/spanner.admin',
-        'https://www.googleapis.com/auth/spanner.data'
-    ]
     try:
-      credentials, _ = google.auth.default(scopes=client_scopes)  # pylint: disable=c-extension-no-member
+      credentials, _ = google.auth.default(scopes=CLIENT_SCOPES)  # pylint: disable=c-extension-no-member
+      credentials = _Credentials._add_impersonation_credentials(
+          credentials, pipeline_options)
       credentials = _ApitoolsCredentialsAdapter(credentials)
       logging.debug(
           'Connecting using Google Application Default '
@@ -160,3 +167,26 @@ class _Credentials(object):
           'Connecting anonymously.',
           e)
       return None
+
+  @staticmethod
+  def _add_impersonation_credentials(credentials, pipeline_options):
+    if isinstance(pipeline_options, PipelineOptions):
+      gcs_options = pipeline_options.view_as(GoogleCloudOptions)
+      impersonate_service_account = gcs_options.impersonate_service_account
+    elif isinstance(pipeline_options, dict):
+      impersonate_service_account = pipeline_options.get(
+          'impersonate_service_account')
+    else:
+      return credentials
+    if impersonate_service_account:
+      _LOGGER.info('Impersonating: %s', impersonate_service_account)
+      impersonate_accounts = impersonate_service_account.split(',')
+      target_principal = impersonate_accounts[-1]
+      delegate_to = impersonate_accounts[0:-1]
+      credentials = impersonated_credentials.Credentials(
+          source_credentials=credentials,
+          target_principal=target_principal,
+          delegates=delegate_to,
+          target_scopes=CLIENT_SCOPES,
+      )
+    return credentials
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 89efa1ef623..bb3b6027340 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -314,7 +314,7 @@ class BigQueryWrapper(object):
 
   The wrapper is used to organize all the BigQuery integration points and
   offer a common place where retry logic for failures can be controlled.
-  In addition it offers various functions used both in sources and sinks
+  In addition, it offers various functions used both in sources and sinks
   (e.g., find and create tables, query a table, etc.).
   """
 
@@ -328,7 +328,7 @@ class BigQueryWrapper(object):
   def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
     self.client = client or bigquery.BigqueryV2(
         http=get_new_http(),
-        credentials=auth.get_service_credentials(),
+        credentials=auth.get_service_credentials(None),
         response_encoding='utf8',
         additional_http_headers={
             "user-agent": "apache-beam-%s" % apache_beam.__version__
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 6e4b1b6c7f0..39ec3c29739 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -38,6 +38,10 @@ class GCSFileSystem(FileSystem):
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
   GCS_PREFIX = 'gs://'
 
+  def __init__(self, pipeline_options):
+    super().__init__(pipeline_options)
+    self._pipeline_options = pipeline_options
+
   @classmethod
   def scheme(cls):
     """URI scheme for the FileSystem
@@ -120,12 +124,15 @@ class GCSFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, (size, updated) in gcsio.GcsIO().list_prefix(
+      for path, (size, updated) in self._gcsIO().list_prefix(
           dir_or_prefix, with_metadata=True).items():
         yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
+  def _gcsIO(self):
+    return gcsio.GcsIO(pipeline_options=self._pipeline_options)
+
   def _path_open(
       self,
       path,
@@ -136,7 +143,7 @@ class GCSFileSystem(FileSystem):
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
     mime_type = CompressionTypes.mime_type(compression_type, mime_type)
-    raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
+    raw_file = self._gcsIO().open(path, mode, mime_type=mime_type)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
     return CompressedFile(raw_file, compression_type=compression_type)
@@ -199,9 +206,9 @@ class GCSFileSystem(FileSystem):
         raise ValueError('Destination %r must be GCS path.' % destination)
       # Use copy_tree if the path ends with / as it is a directory
       if source.endswith('/'):
-        gcsio.GcsIO().copytree(source, destination)
+        self._gcsIO().copytree(source, destination)
       else:
-        gcsio.GcsIO().copy(source, destination)
+        self._gcsIO().copy(source, destination)
 
     exceptions = {}
     for source, destination in zip(source_file_names, destination_file_names):
@@ -242,7 +249,7 @@ class GCSFileSystem(FileSystem):
     # Execute GCS renames if any and return exceptions.
     exceptions = {}
     for batch in gcs_batches:
-      copy_statuses = gcsio.GcsIO().copy_batch(batch)
+      copy_statuses = self._gcsIO().copy_batch(batch)
       copy_succeeded = []
       for src, dest, exception in copy_statuses:
         if exception:
@@ -250,7 +257,7 @@ class GCSFileSystem(FileSystem):
         else:
           copy_succeeded.append((src, dest))
       delete_batch = [src for src, dest in copy_succeeded]
-      delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+      delete_statuses = self._gcsIO().delete_batch(delete_batch)
       for i, (src, exception) in enumerate(delete_statuses):
         dest = copy_succeeded[i][1]
         if exception:
@@ -267,7 +274,7 @@ class GCSFileSystem(FileSystem):
 
     Returns: boolean flag indicating if path exists
     """
-    return gcsio.GcsIO().exists(path)
+    return self._gcsIO().exists(path)
 
   def size(self, path):
     """Get size of path on the FileSystem.
@@ -280,7 +287,7 @@ class GCSFileSystem(FileSystem):
     Raises:
       ``BeamIOError``: if path doesn't exist.
     """
-    return gcsio.GcsIO().size(path)
+    return self._gcsIO().size(path)
 
   def last_updated(self, path):
     """Get UNIX Epoch time in seconds on the FileSystem.
@@ -293,7 +300,7 @@ class GCSFileSystem(FileSystem):
     Raises:
       ``BeamIOError``: if path doesn't exist.
     """
-    return gcsio.GcsIO().last_updated(path)
+    return self._gcsIO().last_updated(path)
 
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
@@ -308,7 +315,7 @@ class GCSFileSystem(FileSystem):
       ``BeamIOError``: if path isn't a file or doesn't exist.
     """
     try:
-      return gcsio.GcsIO().checksum(path)
+      return self._gcsIO().checksum(path)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Checksum operation failed", {path: e})
 
@@ -325,7 +332,7 @@ class GCSFileSystem(FileSystem):
       ``BeamIOError``: if path isn't a file or doesn't exist.
     """
     try:
-      file_metadata = gcsio.GcsIO()._status(path)
+      file_metadata = self._gcsIO()._status(path)
       return FileMetadata(
           path, file_metadata['size'], file_metadata['last_updated'])
     except Exception as e:  # pylint: disable=broad-except
@@ -346,7 +353,7 @@ class GCSFileSystem(FileSystem):
       else:
         path_to_use = path
       match_result = self.match([path_to_use])[0]
-      statuses = gcsio.GcsIO().delete_batch(
+      statuses = self._gcsIO().delete_batch(
           [m.path for m in match_result.metadata_list])
       # pylint: disable=used-before-assignment
       failures = [e for (_, e) in statuses if e is not None]
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index b4d921ada23..49b0bdc9f6c 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -81,7 +81,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_match_multiples(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     gcsio_mock.list_prefix.return_value = {
         'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
     }
@@ -99,7 +99,7 @@ class GCSFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     limit = 1
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
     expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
     match_result = self.fs.match(['gs://bucket/'], [limit])[0]
@@ -112,7 +112,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_match_multiples_error(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.list_prefix.side_effect = exception
 
@@ -128,7 +128,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_match_multiple_patterns(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     gcsio_mock.list_prefix.side_effect = [
         {
             'gs://bucket/file1': (1, 99999.0)
@@ -146,7 +146,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_create(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     # Issue file copy
     _ = self.fs.create('gs://bucket/from1', 'application/octet-stream')
 
@@ -157,7 +157,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_open(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     # Issue file copy
     _ = self.fs.open('gs://bucket/from1', 'application/octet-stream')
 
@@ -168,7 +168,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_copy_file(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     sources = ['gs://bucket/from1']
     destinations = ['gs://bucket/to1']
 
@@ -182,7 +182,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_copy_file_error(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     sources = ['gs://bucket/from1']
     destinations = ['gs://bucket/to1']
 
@@ -208,7 +208,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_copy_tree(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     sources = ['gs://bucket1/']
     destinations = ['gs://bucket2/']
 
@@ -222,7 +222,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_rename(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     sources = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -262,7 +262,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_rename_error(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     sources = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -308,7 +308,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_delete(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
     files = [
         'gs://bucket/from1',
@@ -324,7 +324,7 @@ class GCSFileSystemTest(unittest.TestCase):
   def test_delete_error(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
-    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.delete_batch.side_effect = exception
     gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 87654bc8174..c89728387c2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -132,7 +132,7 @@ def get_or_create_default_gcs_bucket(options):
     return None
 
   bucket_name = default_gcs_bucket_name(project, region)
-  bucket = GcsIO().get_bucket(bucket_name)
+  bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name)
   if bucket:
     return bucket
   else:
@@ -140,7 +140,8 @@ def get_or_create_default_gcs_bucket(options):
         'Creating default GCS bucket for project %s: gs://%s',
         project,
         bucket_name)
-    return GcsIO().create_bucket(bucket_name, project, location=region)
+    return GcsIO(pipeline_options=options).create_bucket(
+        bucket_name, project, location=region)
 
 
 class GcsIOError(IOError, retry.PermanentException):
@@ -150,10 +151,10 @@ class GcsIOError(IOError, retry.PermanentException):
 
 class GcsIO(object):
   """Google Cloud Storage I/O client."""
-  def __init__(self, storage_client=None):
+  def __init__(self, storage_client=None, pipeline_options=None):
     if storage_client is None:
       storage_client = storage.StorageV1(
-          credentials=auth.get_service_credentials(),
+          credentials=auth.get_service_credentials(pipeline_options),
           get_credentials=False,
           http=get_new_http(),
           response_encoding='utf8',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index a4aa2d4aa85..260090461c8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -461,7 +461,7 @@ class TestGCSIO(unittest.TestCase):
 
   @mock.patch(
       'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
-      wraps=lambda: None)
+      wraps=lambda pipeline_options: None)
   @mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
   def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
     client = gcsio.GcsIO()
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index e02f6d79930..5aa29c0fd96 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -748,9 +748,17 @@ class GoogleCloudOptions(PipelineOptions):
         '--enable_artifact_caching',
         default=False,
         action='store_true',
-        help=
-        'When true, artifacts will be cached across job submissions in the GCS '
-        'staging bucket')
+        help='When true, artifacts will be cached across job submissions in '
+        'the GCS staging bucket')
+    parser.add_argument(
+        '--impersonate_service_account',
+        default=None,
+        help='All API requests will be made as the given service account or '
+        'target service account in an impersonation delegation chain '
+        'instead of the currently selected account. You can specify '
+        'either a single service account as the impersonator, or a '
+        'comma-separated list of service accounts to create an '
+        'impersonation delegation chain.')
 
   def _create_default_gcs_bucket(self):
     try:
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 21e3335c077..e0872956529 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -345,6 +345,10 @@ class Environment(object):
           for k, v in sdk_pipeline_options.items() if v is not None
       }
       options_dict["pipelineUrl"] = proto_pipeline_staged_url
+      # Don't pass impersonate_service_account through to the harness.
+      # Though impersonation should start a job, the workers should
+      # not try to modify their credentials.
+      options_dict.pop('impersonate_service_account', None)
       self.proto.sdkPipelineOptions.additionalProperties.append(
           dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
               key='options', value=to_json_value(options_dict)))
@@ -557,7 +561,7 @@ class DataflowApplicationClient(object):
     if self.google_cloud_options.no_auth:
       credentials = None
     else:
-      credentials = get_service_credentials()
+      credentials = get_service_credentials(options)
 
     http_client = get_new_http()
     self._client = dataflow.DataflowV1b3(
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index 71305a5d976..ce5adc84f89 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -487,7 +487,7 @@ def assert_bucket_exists(bucket_name):
   try:
     from apitools.base.py.exceptions import HttpError
     storage_client = storage.StorageV1(
-        credentials=auth.get_service_credentials(),
+        credentials=auth.get_service_credentials(None),
         get_credentials=False,
         http=get_new_http(),
         response_encoding='utf8')
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index d06b005a4f0..f81e015ea59 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -209,7 +209,7 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
     if self._google_cloud_options.no_auth:
       credentials = None
     else:
-      credentials = get_service_credentials()
+      credentials = get_service_credentials(options)
     self._storage_client = storage.StorageV1(
         url='https://www.googleapis.com/storage/v1',
         credentials=credentials,