You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/11/22 17:23:10 UTC
[beam] branch master updated: Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 883a362c930 Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)
883a362c930 is described below
commit 883a362c930aca4298551697d7aaacbe7b6602f1
Author: creste <cr...@users.noreply.github.com>
AuthorDate: Tue Nov 22 12:23:04 2022 -0500
Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)
* Added support for using DefaultAzureCredential() with Blob Storage.
Tests not working yet due to TLS issue with Azurite.
* SubjectAltNames are now included in the SSL cert.
* DefaultAzureCredential is no longer used with AZURE_STORAGE_CONNECTION_STRING.
* Fixed integration tests.
* Can now run azure integration tests in parallel across all python versions.
* Change pipeline option names to be consistent with Java SDK.
* Fix missing Apache 2.0 licenses.
Deleted cert and key files because they can't have a license header.
Moved cert generation to azure_integration_test.sh.
* Fix path to azure integration tests in tox comment.
* Fix path to cert.pem.
* Fix linting issues.
* Fix path to cert.pem
* Fix linter error.
* Added azure integration tests to python 3.7 post-commit test suite.
* Fix unit tests.
* Updated README.md.
* Move changes entry to 2.44.0.
---
CHANGES.md | 1 +
build.gradle.kts | 1 +
sdks/python/apache_beam/internal/azure/__init__.py | 18 +++++
sdks/python/apache_beam/internal/azure/auth.py | 83 ++++++++++++++++++++++
.../apache_beam/io/azure/blobstoragefilesystem.py | 28 +++++---
.../io/azure/blobstoragefilesystem_test.py | 24 +++----
sdks/python/apache_beam/io/azure/blobstorageio.py | 17 ++++-
.../io/azure/integration_test/Dockerfile | 46 ++++++++++++
.../integration_test/azure_integration_test.sh | 79 ++++++++++++++++++++
.../io/azure/integration_test/docker-compose.yml | 50 +++++++++++++
.../apache_beam/io/azure/integration_test/ssl.conf | 37 ++++++++++
.../python/apache_beam/options/pipeline_options.py | 29 ++++++++
sdks/python/setup.py | 1 +
sdks/python/test-suites/direct/common.gradle | 9 +++
sdks/python/tox.ini | 37 ++++++++++
15 files changed, 434 insertions(+), 26 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 414241d1c5b..2446bee91d1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,6 +61,7 @@
* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).
* Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)).
+* Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)).
## New Features / Improvements
diff --git a/build.gradle.kts b/build.gradle.kts
index 2556db7bc9a..fe0956a6ea9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -340,6 +340,7 @@ tasks.register("python37PostCommit") {
dependsOn(":sdks:python:test-suites:direct:py37:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py37:directRunnerIT")
dependsOn(":sdks:python:test-suites:direct:py37:hdfsIntegrationTest")
+ dependsOn(":sdks:python:test-suites:direct:py37:azureIntegrationTest")
dependsOn(":sdks:python:test-suites:direct:py37:mongodbioIT")
dependsOn(":sdks:python:test-suites:portable:py37:postCommitPy37")
dependsOn(":sdks:python:test-suites:dataflow:py37:spannerioIT")
diff --git a/sdks/python/apache_beam/internal/azure/__init__.py b/sdks/python/apache_beam/internal/azure/__init__.py
new file mode 100644
index 00000000000..0bce5d68f72
--- /dev/null
+++ b/sdks/python/apache_beam/internal/azure/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""For internal use only; no backwards-compatibility guarantees."""
diff --git a/sdks/python/apache_beam/internal/azure/auth.py b/sdks/python/apache_beam/internal/azure/auth.py
new file mode 100644
index 00000000000..d505cc200f9
--- /dev/null
+++ b/sdks/python/apache_beam/internal/azure/auth.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure credentials and authentication."""
+
+# pytype: skip-file
+
+import logging
+import threading
+
+from apache_beam.options.pipeline_options import AzureOptions
+
+try:
+ from azure.identity import DefaultAzureCredential
+ _AZURE_AUTH_AVAILABLE = True
+except ImportError:
+ _AZURE_AUTH_AVAILABLE = False
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def get_service_credentials(pipeline_options):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ Get credentials to access Azure services.
+ Args:
+ pipeline_options: Pipeline options, used in creating credentials
+ like managed identity credentials.
+
+ Returns:
+ A ``azure.identity.*Credential`` object or None if credentials
+ not found. Returned object is thread-safe.
+ """
+ return _Credentials.get_service_credentials(pipeline_options)
+
+
+class _Credentials(object):
+ _credentials_lock = threading.Lock()
+ _credentials_init = False
+ _credentials = None
+
+ @classmethod
+ def get_service_credentials(cls, pipeline_options):
+ with cls._credentials_lock:
+ if cls._credentials_init:
+ return cls._credentials
+ cls._credentials = cls._get_service_credentials(pipeline_options)
+ cls._credentials_init = True
+
+ return cls._credentials
+
+ @staticmethod
+ def _get_service_credentials(pipeline_options):
+ if not _AZURE_AUTH_AVAILABLE:
+ _LOGGER.warning(
+ 'Unable to find default credentials because the azure.identity '
+ 'library is not available. Install the azure.identity library to use '
+ 'Azure default credentials.')
+ return None
+
+ try:
+ credentials = DefaultAzureCredential(
+ managed_identity_client_id=pipeline_options.view_as(AzureOptions)\
+ .azure_managed_identity_client_id)
+ _LOGGER.debug('Connecting using Azure Default Credentials.')
+ return credentials
+ except Exception as e:
+ _LOGGER.warning('Unable to find Azure credentials to use: %s\n', e)
+ return None
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index 7b86d592b28..cc678f9ae0f 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -37,6 +37,10 @@ class BlobStorageFileSystem(FileSystem):
CHUNK_SIZE = blobstorageio.MAX_BATCH_OPERATION_SIZE
AZURE_FILE_SYSTEM_PREFIX = 'azfs://'
+ def __init__(self, pipeline_options):
+ super().__init__(pipeline_options)
+ self._pipeline_options = pipeline_options
+
@classmethod
def scheme(cls):
"""URI scheme for the FileSystem
@@ -118,12 +122,15 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
- for path, (size, updated) in blobstorageio.BlobStorageIO() \
+ for path, (size, updated) in self._blobstorageIO() \
.list_prefix(dir_or_prefix, with_metadata=True).items():
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})
+ def _blobstorageIO(self):
+ return blobstorageio.BlobStorageIO(pipeline_options=self._pipeline_options)
+
def _path_open(
self,
path,
@@ -134,8 +141,7 @@ class BlobStorageFileSystem(FileSystem):
"""
compression_type = FileSystem._get_compression_type(path, compression_type)
mime_type = CompressionTypes.mime_type(compression_type, mime_type)
- raw_file = blobstorageio.BlobStorageIO().open(
- path, mode, mime_type=mime_type)
+ raw_file = self._blobstorageIO().open(path, mode, mime_type=mime_type)
if compression_type == CompressionTypes.UNCOMPRESSED:
return raw_file
return CompressedFile(raw_file, compression_type=compression_type)
@@ -190,7 +196,7 @@ class BlobStorageFileSystem(FileSystem):
message = 'Unable to copy unequal number of sources and destinations.'
raise BeamIOError(message)
src_dest_pairs = list(zip(source_file_names, destination_file_names))
- return blobstorageio.BlobStorageIO().copy_paths(src_dest_pairs)
+ return self._blobstorageIO().copy_paths(src_dest_pairs)
def rename(self, source_file_names, destination_file_names):
"""Rename the files at the source list to the destination list.
@@ -207,7 +213,7 @@ class BlobStorageFileSystem(FileSystem):
message = 'Unable to rename unequal number of sources and destinations.'
raise BeamIOError(message)
src_dest_pairs = list(zip(source_file_names, destination_file_names))
- results = blobstorageio.BlobStorageIO().rename_files(src_dest_pairs)
+ results = self._blobstorageIO().rename_files(src_dest_pairs)
# Retrieve exceptions.
exceptions = {(src, dest): error
for (src, dest, error) in results if error is not None}
@@ -223,7 +229,7 @@ class BlobStorageFileSystem(FileSystem):
Returns: boolean flag indicating if path exists
"""
try:
- return blobstorageio.BlobStorageIO().exists(path)
+ return self._blobstorageIO().exists(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Exists operation failed", {path: e})
@@ -239,7 +245,7 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if path doesn't exist.
"""
try:
- return blobstorageio.BlobStorageIO().size(path)
+ return self._blobstorageIO().size(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Size operation failed", {path: e})
@@ -255,7 +261,7 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if path doesn't exist.
"""
try:
- return blobstorageio.BlobStorageIO().last_updated(path)
+ return self._blobstorageIO().last_updated(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Last updated operation failed", {path: e})
@@ -272,7 +278,7 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
- return blobstorageio.BlobStorageIO().checksum(path)
+ return self._blobstorageIO().checksum(path)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path, e})
@@ -289,7 +295,7 @@ class BlobStorageFileSystem(FileSystem):
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
- file_metadata = blobstorageio.BlobStorageIO()._status(path)
+ file_metadata = self._blobstorageIO()._status(path)
return FileMetadata(
path, file_metadata['size'], file_metadata['last_updated'])
except Exception as e: # pylint: disable=broad-except
@@ -305,7 +311,7 @@ class BlobStorageFileSystem(FileSystem):
Raises:
``BeamIOError``: if any of the delete operations fail
"""
- results = blobstorageio.BlobStorageIO().delete_paths(paths)
+ results = self._blobstorageIO().delete_paths(paths)
# Retrieve exceptions.
exceptions = {
path: error
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index 241cc72a15d..6c844329867 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -88,7 +88,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
blobstorageio_mock.exists.return_value = True
blobstorageio_mock._status.return_value = {
'size': 1, 'last_updated': 99999.0
@@ -107,7 +107,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
blobstorageio_mock.list_prefix.return_value = {
'azfs://storageaccount/container/file1': (1, 99999.0),
'azfs://storageaccount/container/file2': (2, 88888.0)
@@ -128,7 +128,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
blobstorageio_mock = mock.MagicMock()
limit = 1
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
blobstorageio_mock.list_prefix.return_value = {
'azfs://storageaccount/container/file1': (1, 99999.0)
}
@@ -146,7 +146,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
exception = IOError('Failed')
blobstorageio_mock.list_prefix.side_effect = exception
@@ -165,7 +165,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
blobstorageio_mock.list_prefix.side_effect = [
{
'azfs://storageaccount/container/file1': (1, 99999.0)
@@ -189,7 +189,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
# Issue file copy.
_ = self.fs.create(
'azfs://storageaccount/container/file1', 'application/octet-stream')
@@ -204,7 +204,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
# Issue file copy.
_ = self.fs.open(
'azfs://storageaccount/container/file1', 'application/octet-stream')
@@ -219,7 +219,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
sources = [
'azfs://storageaccount/container/from1',
'azfs://storageaccount/container/from2',
@@ -240,7 +240,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
sources = [
'azfs://storageaccount/container/from1',
'azfs://storageaccount/container/from2',
@@ -260,7 +260,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
blobstorageio_mock.size.return_value = 0
files = [
'azfs://storageaccount/container/from1',
@@ -276,7 +276,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
nonexistent_directory = 'azfs://storageaccount/nonexistent-container/tree/'
exception = blobstorageio.BlobStorageError('Not found', 404)
@@ -307,7 +307,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
# Prepare mocks.
blobstorageio_mock = mock.MagicMock()
blobstoragefilesystem.blobstorageio.BlobStorageIO = \
- lambda: blobstorageio_mock
+ lambda pipeline_options: blobstorageio_mock
sources = [
'azfs://storageaccount/container/original_blob1',
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index ae0a4944d52..c614ad64ab3 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -28,10 +28,12 @@ import re
import tempfile
import time
+from apache_beam.internal.azure import auth
from apache_beam.io.filesystemio import Downloader
from apache_beam.io.filesystemio import DownloaderStream
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.options.pipeline_options import AzureOptions
from apache_beam.utils import retry
_LOGGER = logging.getLogger(__name__)
@@ -105,10 +107,19 @@ class BlobStorageError(Exception):
class BlobStorageIO(object):
"""Azure Blob Storage I/O client."""
- def __init__(self, client=None):
- connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+ def __init__(self, client=None, pipeline_options=None):
if client is None:
- self.client = BlobServiceClient.from_connection_string(connect_str)
+ azure_options = pipeline_options.view_as(AzureOptions)
+ connect_str = azure_options.azure_connection_string or \
+ os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+ if connect_str:
+ self.client = BlobServiceClient.from_connection_string(
+ conn_str=connect_str)
+ else:
+ credential = auth.get_service_credentials(pipeline_options)
+ self.client = BlobServiceClient(
+ account_url=azure_options.blob_service_endpoint,
+ credential=credential)
else:
self.client = client
if not AZURE_DEPS_INSTALLED:
diff --git a/sdks/python/apache_beam/io/azure/integration_test/Dockerfile b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile
new file mode 100644
index 00000000000..7ae67fb8500
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile
@@ -0,0 +1,46 @@
+###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+# This image contains a Python SDK build and dependencies.
+# By default it runs wordcount against a locally accessible Azurite service.
+# See azure_integration_test.sh for example usage.
+ARG BASE_IMAGE
+FROM $BASE_IMAGE
+
+# Install Azure CLI.
+RUN curl -sL https://aka.ms/InstallAzureCLIDeb | bash
+
+WORKDIR /app
+
+# Add Beam SDK sources.
+COPY sdks/python /app/sdks/python
+COPY model /app/model
+
+# This step should look like setupVirtualenv minus virtualenv creation.
+RUN pip install --no-cache-dir tox==3.11.1 -r sdks/python/build-requirements.txt
+
+# Add Azurite's self-signed cert to the global CA cert store.
+COPY cert.pem /usr/local/share/ca-certificates/azurite.crt
+RUN update-ca-certificates
+
+# Tell python to use the global CA cert store.
+# See https://stackoverflow.com/a/66494735
+ENV REQUESTS_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt
+
+# Run wordcount, and write results to Azurite.
+CMD cd sdks/python && tox -e azure_integration_test
diff --git a/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh
new file mode 100755
index 00000000000..cfac5421093
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh
@@ -0,0 +1,79 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Runs Python Azure integration tests.
+#
+# Requires docker, docker-compose to be installed.
+
+# Usage check.
+if [[ $# != 1 ]]; then
+ printf "Usage: \n$> ./apache_beam/io/azure/integration_test/azure_integration_test.sh <python_version>"
+ printf "\n\tpython_version: [required] Python version used for container build and run tests."
+ printf " Use 'python:3.8' for Python3.8."
+ exit 1
+fi
+
+set -e -u -x
+
+PY_PREFIX=$(echo "$1" | tr ':.' '_')
+
+# Setup context directory.
+TEST_DIR=$(dirname $0)
+ROOT_DIR=$(cd "${TEST_DIR}/../../../../../.." && pwd)
+CONTEXT_DIR=${ROOT_DIR}/build/azure_integration_${PY_PREFIX}
+rm -rf ${CONTEXT_DIR} || true
+
+mkdir -p ${CONTEXT_DIR}/sdks
+cp -f ${TEST_DIR}/* ${CONTEXT_DIR}/
+cp -r ${ROOT_DIR}/sdks/python ${CONTEXT_DIR}/sdks/
+cp -r ${ROOT_DIR}/model ${CONTEXT_DIR}/
+
+# Use a unique name to allow concurrent runs on the same machine.
+PROJECT_NAME=$(echo azure_IT-$PY_PREFIX-${BUILD_TAG:-non-jenkins})
+
+if [ -z "${BUILD_TAG:-}" ]; then
+ COLOR_OPT=""
+else
+ COLOR_OPT="--no-ansi"
+fi
+COMPOSE_OPT="-p ${PROJECT_NAME} ${COLOR_OPT}"
+
+cd ${CONTEXT_DIR}
+
+# Generate a self-signed certificate for Azurite.
+openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout key.pem \
+ -out cert.pem -config ssl.conf -extensions 'v3_req'
+
+# Clean up leftover unused networks from previous runs. BEAM-4051
+# This might mess with leftover containers that still reference pruned networks,
+# so --force-recreate is passed to 'docker up' below.
+# https://github.com/docker/compose/issues/5745#issuecomment-370031631
+docker network prune --force
+
+# BEAM-7405: Create and point to an empty config file to work around "gcloud"
+# appearing in ~/.docker/config.json but not being installed.
+export DOCKER_CONFIG=.
+echo '{}' > config.json
+
+function finally {
+ time docker-compose ${COMPOSE_OPT} down
+}
+trap finally EXIT
+
+time docker-compose ${COMPOSE_OPT} build --build-arg BASE_IMAGE=$1
+time docker-compose ${COMPOSE_OPT} up --exit-code-from test \
+ --abort-on-container-exit --force-recreate
diff --git a/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml
new file mode 100644
index 00000000000..963b4a94c56
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3'
+
+services:
+ azurite:
+ image: mcr.microsoft.com/azure-storage/azurite:3.20.1
+ command:
+ - "azurite-blob"
+ - "--blobHost"
+ - "0.0.0.0"
+ - "--blobPort"
+ - "10000"
+ - "--oauth"
+ - "basic"
+ - "--cert"
+ - "/opt/azurite/certs/cert.pem"
+ - "--key"
+ - "/opt/azurite/certs/key.pem"
+ hostname: azurite
+ networks:
+ - azure_test_net
+ volumes:
+ - ./:/opt/azurite/certs
+
+ # Integration test.
+ test:
+ build: .
+ networks:
+ - azure_test_net
+ depends_on:
+ - "azurite"
+
+networks:
+ azure_test_net:
\ No newline at end of file
diff --git a/sdks/python/apache_beam/io/azure/integration_test/ssl.conf b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf
new file mode 100644
index 00000000000..75128200ae8
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[ req ]
+default_bits = 2048
+distinguished_name = req_distinguished_name
+x509_extensions = v3_req
+prompt = no
+
+[ req_distinguished_name ]
+countryName = CO
+stateOrProvinceName = ST
+localityName = LO
+organizationName = OU
+commonName = CN
+
+[ v3_req ]
+keyUsage = keyEncipherment, dataEncipherment
+extendedKeyUsage = serverAuth
+subjectAltName = @alt_names
+
+[ alt_names ]
+DNS.1 = 127.0.0.1
+DNS.2 = azurite
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index b51a3c93bb8..02d8b02c18e 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -42,6 +42,7 @@ __all__ = [
'TypeOptions',
'DirectOptions',
'GoogleCloudOptions',
+ 'AzureOptions',
'HadoopFileSystemOptions',
'WorkerOptions',
'DebugOptions',
@@ -861,6 +862,34 @@ class GoogleCloudOptions(PipelineOptions):
return errors
+class AzureOptions(PipelineOptions):
+ """Azure Blob Storage options."""
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--azure_connection_string',
+ default=None,
+ help='Connection string of the Azure Blob Storage Account.')
+ parser.add_argument(
+ '--blob_service_endpoint',
+ default=None,
+ help='URL of the Azure Blob Storage Account.')
+ parser.add_argument(
+ '--azure_managed_identity_client_id',
+ default=None,
+ help='Client ID of a user-assigned managed identity.')
+
+ def validate(self, validator):
+ errors = []
+ if self.azure_connection_string:
+ if self.blob_service_endpoint:
+ errors.append(
+ '--azure_connection_string and '
+ '--blob_service_endpoint are mutually exclusive.')
+
+ return errors
+
+
class HadoopFileSystemOptions(PipelineOptions):
"""``HadoopFileSystem`` connection options."""
@classmethod
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ba0c2f38eec..2c30414d931 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -343,6 +343,7 @@ if __name__ == '__main__':
'azure': [
'azure-storage-blob >=12.3.2',
'azure-core >=1.7.0',
+ 'azure-identity >=1.12.0',
],
#(TODO): Some tests using Pandas implicitly calls inspect.stack()
# with python 3.10 leading to incorrect stacktrace.
diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle
index cd23645db15..c8db6388305 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -188,6 +188,15 @@ tasks.register("hdfsIntegrationTest") {
}
}
+tasks.register("azureIntegrationTest") {
+ doLast {
+ exec {
+ executable 'sh'
+ args '-c', "${rootDir}/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh python:${pythonContainerVersion}"
+ }
+ }
+}
+
// Pytorch RunInference IT tests
task torchInferenceTest {
dependsOn 'installGcpTest'
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 33ec39c4189..76d26c271e2 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -183,6 +183,43 @@ commands =
commands_pre =
pip check
+[testenv:azure_integration_test]
+# Used by azure/integration_test/azure_integration_test.sh.
+# Do not run this directly, as it depends on nodes defined in
+# azure/integration_test/docker-compose.yml.
+deps =
+ -r build-requirements.txt
+extras =
+ azure
+whitelist_externals =
+ echo
+ sleep
+passenv = REQUESTS_CA_BUNDLE
+setenv =
+ CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://azurite:10000/devstoreaccount1;
+commands_pre =
+ pip check
+ wget storage.googleapis.com/dataflow-samples/shakespeare/kinglear.txt
+ # Create container for storing files.
+ az storage container create -n container --connection-string {env:CONNECTION_STRING}
+ # Upload test file.
+ az storage blob upload -f kinglear.txt -c container -n kinglear.txt --connection-string {env:CONNECTION_STRING}
+commands =
+ # Test --azure_connection_string
+ python -m apache_beam.examples.wordcount \
+ --input azfs://devstoreaccount1/container/* \
+ --output azfs://devstoreaccount1/container/py-wordcount-integration \
+ --azure_connection_string {env:CONNECTION_STRING}
+ # This doesn't work because there's no way to send a fake bearer token to
+ # Azurite when using DefaultAzureCredential.
+ # See https://github.com/Azure/Azurite/issues/389#issuecomment-615298432
+ # and https://github.com/Azure/Azurite/issues/1750#issue-1449778593
+ #python -m apache_beam.examples.wordcount \
+ # --input azfs://devstoreaccount1/container/* \
+ # --output azfs://devstoreaccount1/container/py-wordcount-integration \
+ # --blob_service_endpoint https://azurite:10000/devstoreaccount1/container-name \
+ # --azure_managed_identity_client_id "abc123"
+
[testenv:py3-yapf]
# keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
deps =