You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/13 20:28:54 UTC
[beam] branch release-2.39.0 updated: [BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.39.0 by this push:
new 10aeab88c12 [BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)
10aeab88c12 is described below
commit 10aeab88c1260904df5adf62ac7d958872e581d1
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Fri May 13 13:28:47 2022 -0700
[BEAM-14014] Support impersonation credentials in dataflow runner (#17244) (#17668)
Co-authored-by: Ryan Thompson <ry...@gmail.com>
---
.../apache_beam/examples/wordcount_it_test.py | 39 +++++++++++++
sdks/python/apache_beam/internal/gcp/auth.py | 66 ++++++++++++++++------
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 4 +-
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 31 ++++++----
.../apache_beam/io/gcp/gcsfilesystem_test.py | 26 ++++-----
sdks/python/apache_beam/io/gcp/gcsio.py | 9 +--
sdks/python/apache_beam/io/gcp/gcsio_test.py | 2 +-
.../python/apache_beam/options/pipeline_options.py | 14 ++++-
.../runners/dataflow/internal/apiclient.py | 6 +-
.../apache_beam/runners/interactive/utils.py | 2 +-
.../runners/portability/sdk_container_builder.py | 2 +-
11 files changed, 145 insertions(+), 56 deletions(-)
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index be8bbbfed8a..afbe70b9d23 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -28,6 +28,7 @@ import pytest
from hamcrest.core.core.allof import all_of
from apache_beam.examples import wordcount
+from apache_beam.internal.gcp import auth
from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
@@ -47,6 +48,44 @@ class WordCountIT(unittest.TestCase):
def test_wordcount_it(self):
self._run_wordcount_it(wordcount.run)
+ @pytest.mark.it_postcommit
+ @pytest.mark.sickbay_direct
+ @pytest.mark.sickbay_spark
+ @pytest.mark.sickbay_flink
+ def test_wordcount_impersonation_it(self):
+ """Tests impersonation on dataflow.
+
+ For testing impersonation, we use three ingredients:
+ - a principal to impersonate
+ - a dataflow service account that only that principal is
+ allowed to launch jobs as
+ - a temp root that only the above two accounts have access to
+
+ Jenkins and Dataflow workers both run as GCE default service account.
+ So we remove that account from all the above.
+ """
+ # Credentials need to be reset or this test will fail and credentials
+ # from a previous test will be used.
+ auth._Credentials._credentials_init = False
+
+ ACCOUNT_TO_IMPERSONATE = (
+ 'allows-impersonation@apache-'
+ 'beam-testing.iam.gserviceaccount.com')
+ RUNNER_ACCOUNT = (
+ 'impersonation-dataflow-worker@'
+ 'apache-beam-testing.iam.gserviceaccount.com')
+ TEMP_DIR = 'gs://impersonation-test-bucket/temp-it'
+ STAGING_LOCATION = 'gs://impersonation-test-bucket/staging-it'
+ extra_options = {
+ 'impersonate_service_account': ACCOUNT_TO_IMPERSONATE,
+ 'service_account_email': RUNNER_ACCOUNT,
+ 'temp_location': TEMP_DIR,
+ 'staging_location': STAGING_LOCATION
+ }
+ self._run_wordcount_it(wordcount.run, **extra_options)
+ # Reset credentials for future tests.
+ auth._Credentials._credentials_init = False
+
@pytest.mark.it_postcommit
@pytest.mark.it_validatescontainer
def test_wordcount_fnapi_it(self):
diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py
index 439264a9794..27a3c40cd4b 100644
--- a/sdks/python/apache_beam/internal/gcp/auth.py
+++ b/sdks/python/apache_beam/internal/gcp/auth.py
@@ -23,8 +23,12 @@ import logging
import socket
import threading
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.pipeline_options import PipelineOptions
+
# google.auth is only available when Beam is installed with the gcp extra.
try:
+ from google.auth import impersonated_credentials
import google.auth
import google_auth_httplib2
_GOOGLE_AUTH_AVAILABLE = True
@@ -40,6 +44,16 @@ executing_project = None
_LOGGER = logging.getLogger(__name__)
+CLIENT_SCOPES = [
+ 'https://www.googleapis.com/auth/bigquery',
+ 'https://www.googleapis.com/auth/cloud-platform',
+ 'https://www.googleapis.com/auth/devstorage.full_control',
+ 'https://www.googleapis.com/auth/userinfo.email',
+ 'https://www.googleapis.com/auth/datastore',
+ 'https://www.googleapis.com/auth/spanner.admin',
+ 'https://www.googleapis.com/auth/spanner.data'
+]
+
def set_running_in_gce(worker_executing_project):
"""For internal use only; no backwards-compatibility guarantees.
@@ -59,16 +73,19 @@ def set_running_in_gce(worker_executing_project):
executing_project = worker_executing_project
-def get_service_credentials():
+def get_service_credentials(pipeline_options):
"""For internal use only; no backwards-compatibility guarantees.
Get credentials to access Google services.
+ Args:
+ pipeline_options: Pipeline options, used in creating credentials
+ like impersonated credentials.
Returns:
A ``google.auth.credentials.Credentials`` object or None if credentials
not found. Returned object is thread-safe.
"""
- return _Credentials.get_service_credentials()
+ return _Credentials.get_service_credentials(pipeline_options)
if _GOOGLE_AUTH_AVAILABLE:
@@ -108,10 +125,7 @@ class _Credentials(object):
_credentials = None
@classmethod
- def get_service_credentials(cls):
- if cls._credentials_init:
- return cls._credentials
-
+ def get_service_credentials(cls, pipeline_options):
with cls._credentials_lock:
if cls._credentials_init:
return cls._credentials
@@ -124,13 +138,13 @@ class _Credentials(object):
_LOGGER.info(
"socket default timeout is %s seconds.", socket.getdefaulttimeout())
- cls._credentials = cls._get_service_credentials()
+ cls._credentials = cls._get_service_credentials(pipeline_options)
cls._credentials_init = True
return cls._credentials
@staticmethod
- def _get_service_credentials():
+ def _get_service_credentials(pipeline_options):
if not _GOOGLE_AUTH_AVAILABLE:
_LOGGER.warning(
'Unable to find default credentials because the google-auth library '
@@ -138,17 +152,10 @@ class _Credentials(object):
'Google default credentials. Connecting anonymously.')
return None
- client_scopes = [
- 'https://www.googleapis.com/auth/bigquery',
- 'https://www.googleapis.com/auth/cloud-platform',
- 'https://www.googleapis.com/auth/devstorage.full_control',
- 'https://www.googleapis.com/auth/userinfo.email',
- 'https://www.googleapis.com/auth/datastore',
- 'https://www.googleapis.com/auth/spanner.admin',
- 'https://www.googleapis.com/auth/spanner.data'
- ]
try:
- credentials, _ = google.auth.default(scopes=client_scopes) # pylint: disable=c-extension-no-member
+ credentials, _ = google.auth.default(scopes=CLIENT_SCOPES) # pylint: disable=c-extension-no-member
+ credentials = _Credentials._add_impersonation_credentials(
+ credentials, pipeline_options)
credentials = _ApitoolsCredentialsAdapter(credentials)
logging.debug(
'Connecting using Google Application Default '
@@ -160,3 +167,26 @@ class _Credentials(object):
'Connecting anonymously.',
e)
return None
+
+ @staticmethod
+ def _add_impersonation_credentials(credentials, pipeline_options):
+ if isinstance(pipeline_options, PipelineOptions):
+ gcs_options = pipeline_options.view_as(GoogleCloudOptions)
+ impersonate_service_account = gcs_options.impersonate_service_account
+ elif isinstance(pipeline_options, dict):
+ impersonate_service_account = pipeline_options.get(
+ 'impersonate_service_account')
+ else:
+ return credentials
+ if impersonate_service_account:
+ _LOGGER.info('Impersonating: %s', impersonate_service_account)
+ impersonate_accounts = impersonate_service_account.split(',')
+ target_principal = impersonate_accounts[-1]
+ delegate_to = impersonate_accounts[0:-1]
+ credentials = impersonated_credentials.Credentials(
+ source_credentials=credentials,
+ target_principal=target_principal,
+ delegates=delegate_to,
+ target_scopes=CLIENT_SCOPES,
+ )
+ return credentials
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 89efa1ef623..bb3b6027340 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -314,7 +314,7 @@ class BigQueryWrapper(object):
The wrapper is used to organize all the BigQuery integration points and
offer a common place where retry logic for failures can be controlled.
- In addition it offers various functions used both in sources and sinks
+ In addition, it offers various functions used both in sources and sinks
(e.g., find and create tables, query a table, etc.).
"""
@@ -328,7 +328,7 @@ class BigQueryWrapper(object):
def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
- credentials=auth.get_service_credentials(),
+ credentials=auth.get_service_credentials(None),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 6e4b1b6c7f0..39ec3c29739 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -38,6 +38,10 @@ class GCSFileSystem(FileSystem):
CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations
GCS_PREFIX = 'gs://'
+ def __init__(self, pipeline_options):
+ super().__init__(pipeline_options)
+ self._pipeline_options = pipeline_options
+
@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
@@ -120,12 +124,15 @@ class GCSFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, (size, updated) in gcsio.GcsIO().list_prefix(
+ for path, (size, updated) in self._gcsIO().list_prefix(
dir_or_prefix, with_metadata=True).items():
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
+ def _gcsIO(self):
+ return gcsio.GcsIO(pipeline_options=self._pipeline_options)
+
def _path_open(
self,
path,
@@ -136,7 +143,7 @@ class GCSFileSystem(FileSystem):
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
mime_type = CompressionTypes.mime_type(compression_type, mime_type)
- raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
+ raw_file = self._gcsIO().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
return CompressedFile(raw_file, compression_type=compression_type)
@@ -199,9 +206,9 @@ class GCSFileSystem(FileSystem):
raise ValueError('Destination %r must be GCS path.' % destination)
# Use copy_tree if the path ends with / as it is a directory
if source.endswith('/'):
- gcsio.GcsIO().copytree(source, destination)
+ self._gcsIO().copytree(source, destination)
else:
- gcsio.GcsIO().copy(source, destination)
+ self._gcsIO().copy(source, destination)
exceptions = {}
for source, destination in zip(source_file_names, destination_file_names):
@@ -242,7 +249,7 @@ class GCSFileSystem(FileSystem):
# Execute GCS renames if any and return exceptions.
exceptions = {}
for batch in gcs_batches:
- copy_statuses = gcsio.GcsIO().copy_batch(batch)
+ copy_statuses = self._gcsIO().copy_batch(batch)
copy_succeeded = []
for src, dest, exception in copy_statuses:
if exception:
@@ -250,7 +257,7 @@ class GCSFileSystem(FileSystem):
else:
copy_succeeded.append((src, dest))
delete_batch = [src for src, dest in copy_succeeded]
- delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+ delete_statuses = self._gcsIO().delete_batch(delete_batch)
for i, (src, exception) in enumerate(delete_statuses):
dest = copy_succeeded[i][1]
if exception:
@@ -267,7 +274,7 @@ class GCSFileSystem(FileSystem):
Returns: boolean flag indicating if path exists
"""
- return gcsio.GcsIO().exists(path)
+ return self._gcsIO().exists(path)
def size(self, path):
"""Get size of path on the FileSystem.
@@ -280,7 +287,7 @@ class GCSFileSystem(FileSystem):
Raises:
``BeamIOError``: if path doesn't exist.
"""
- return gcsio.GcsIO().size(path)
+ return self._gcsIO().size(path)
def last_updated(self, path):
"""Get UNIX Epoch time in seconds on the FileSystem.
@@ -293,7 +300,7 @@ class GCSFileSystem(FileSystem):
Raises:
``BeamIOError``: if path doesn't exist.
"""
- return gcsio.GcsIO().last_updated(path)
+ return self._gcsIO().last_updated(path)
def checksum(self, path):
"""Fetch checksum metadata of a file on the
@@ -308,7 +315,7 @@ class GCSFileSystem(FileSystem):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
- return gcsio.GcsIO().checksum(path)
+ return self._gcsIO().checksum(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})
@@ -325,7 +332,7 @@ class GCSFileSystem(FileSystem):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
- file_metadata = gcsio.GcsIO()._status(path)
+ file_metadata = self._gcsIO()._status(path)
return FileMetadata(
path, file_metadata['size'], file_metadata['last_updated'])
except Exception as e: # pylint: disable=broad-except
@@ -346,7 +353,7 @@ class GCSFileSystem(FileSystem):
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
- statuses = gcsio.GcsIO().delete_batch(
+ statuses = self._gcsIO().delete_batch(
[m.path for m in match_result.metadata_list])
# pylint: disable=used-before-assignment
failures = [e for (_, e) in statuses if e is not None]
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index b4d921ada23..49b0bdc9f6c 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -81,7 +81,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_match_multiples(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
gcsio_mock.list_prefix.return_value = {
'gs://bucket/file1': (1, 99999.0), 'gs://bucket/file2': (2, 88888.0)
}
@@ -99,7 +99,7 @@ class GCSFileSystemTest(unittest.TestCase):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
limit = 1
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
gcsio_mock.list_prefix.return_value = {'gs://bucket/file1': (1, 99999.0)}
expected_results = set([FileMetadata('gs://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['gs://bucket/'], [limit])[0]
@@ -112,7 +112,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_match_multiples_error(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
exception = IOError('Failed')
gcsio_mock.list_prefix.side_effect = exception
@@ -128,7 +128,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_match_multiple_patterns(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
gcsio_mock.list_prefix.side_effect = [
{
'gs://bucket/file1': (1, 99999.0)
@@ -146,7 +146,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_create(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
# Issue file copy
_ = self.fs.create('gs://bucket/from1', 'application/octet-stream')
@@ -157,7 +157,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_open(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
# Issue file copy
_ = self.fs.open('gs://bucket/from1', 'application/octet-stream')
@@ -168,7 +168,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_copy_file(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
sources = ['gs://bucket/from1']
destinations = ['gs://bucket/to1']
@@ -182,7 +182,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_copy_file_error(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
sources = ['gs://bucket/from1']
destinations = ['gs://bucket/to1']
@@ -208,7 +208,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_copy_tree(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
sources = ['gs://bucket1/']
destinations = ['gs://bucket2/']
@@ -222,7 +222,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_rename(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
sources = [
'gs://bucket/from1',
'gs://bucket/from2',
@@ -262,7 +262,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_rename_error(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
sources = [
'gs://bucket/from1',
'gs://bucket/from2',
@@ -308,7 +308,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_delete(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
files = [
'gs://bucket/from1',
@@ -324,7 +324,7 @@ class GCSFileSystemTest(unittest.TestCase):
def test_delete_error(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
- gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+ gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
exception = IOError('Failed')
gcsio_mock.delete_batch.side_effect = exception
gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 87654bc8174..c89728387c2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -132,7 +132,7 @@ def get_or_create_default_gcs_bucket(options):
return None
bucket_name = default_gcs_bucket_name(project, region)
- bucket = GcsIO().get_bucket(bucket_name)
+ bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name)
if bucket:
return bucket
else:
@@ -140,7 +140,8 @@ def get_or_create_default_gcs_bucket(options):
'Creating default GCS bucket for project %s: gs://%s',
project,
bucket_name)
- return GcsIO().create_bucket(bucket_name, project, location=region)
+ return GcsIO(pipeline_options=options).create_bucket(
+ bucket_name, project, location=region)
class GcsIOError(IOError, retry.PermanentException):
@@ -150,10 +151,10 @@ class GcsIOError(IOError, retry.PermanentException):
class GcsIO(object):
"""Google Cloud Storage I/O client."""
- def __init__(self, storage_client=None):
+ def __init__(self, storage_client=None, pipeline_options=None):
if storage_client is None:
storage_client = storage.StorageV1(
- credentials=auth.get_service_credentials(),
+ credentials=auth.get_service_credentials(pipeline_options),
get_credentials=False,
http=get_new_http(),
response_encoding='utf8',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index a4aa2d4aa85..260090461c8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -461,7 +461,7 @@ class TestGCSIO(unittest.TestCase):
@mock.patch(
'apache_beam.io.gcp.gcsio.auth.get_service_credentials',
- wraps=lambda: None)
+ wraps=lambda pipeline_options: None)
@mock.patch('apache_beam.io.gcp.gcsio.get_new_http')
def test_user_agent_passed(self, get_new_http_mock, get_service_creds_mock):
client = gcsio.GcsIO()
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index e02f6d79930..5aa29c0fd96 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -748,9 +748,17 @@ class GoogleCloudOptions(PipelineOptions):
'--enable_artifact_caching',
default=False,
action='store_true',
- help=
- 'When true, artifacts will be cached across job submissions in the GCS '
- 'staging bucket')
+ help='When true, artifacts will be cached across job submissions in '
+ 'the GCS staging bucket')
+ parser.add_argument(
+ '--impersonate_service_account',
+ default=None,
+ help='All API requests will be made as the given service account or '
+ 'target service account in an impersonation delegation chain '
+ 'instead of the currently selected account. You can specify '
+ 'either a single service account as the impersonator, or a '
+ 'comma-separated list of service accounts to create an '
+ 'impersonation delegation chain.')
def _create_default_gcs_bucket(self):
try:
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 21e3335c077..e0872956529 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -345,6 +345,10 @@ class Environment(object):
for k, v in sdk_pipeline_options.items() if v is not None
}
options_dict["pipelineUrl"] = proto_pipeline_staged_url
+ # Don't pass impersonate_service_account through to the harness.
+ # Though impersonation should start a job, the workers should
+ # not try to modify their credentials.
+ options_dict.pop('impersonate_service_account', None)
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='options', value=to_json_value(options_dict)))
@@ -557,7 +561,7 @@ class DataflowApplicationClient(object):
if self.google_cloud_options.no_auth:
credentials = None
else:
- credentials = get_service_credentials()
+ credentials = get_service_credentials(options)
http_client = get_new_http()
self._client = dataflow.DataflowV1b3(
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py
index 71305a5d976..ce5adc84f89 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -487,7 +487,7 @@ def assert_bucket_exists(bucket_name):
try:
from apitools.base.py.exceptions import HttpError
storage_client = storage.StorageV1(
- credentials=auth.get_service_credentials(),
+ credentials=auth.get_service_credentials(None),
get_credentials=False,
http=get_new_http(),
response_encoding='utf8')
diff --git a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
index d06b005a4f0..f81e015ea59 100644
--- a/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
+++ b/sdks/python/apache_beam/runners/portability/sdk_container_builder.py
@@ -209,7 +209,7 @@ class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
if self._google_cloud_options.no_auth:
credentials = None
else:
- credentials = get_service_credentials()
+ credentials = get_service_credentials(options)
self._storage_client = storage.StorageV1(
url='https://www.googleapis.com/storage/v1',
credentials=credentials,