You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2022/11/22 17:23:10 UTC

[beam] branch master updated: Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 883a362c930 Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)
883a362c930 is described below

commit 883a362c930aca4298551697d7aaacbe7b6602f1
Author: creste <cr...@users.noreply.github.com>
AuthorDate: Tue Nov 22 12:23:04 2022 -0500

    Teach Azure Filesystem to authenticate using DefaultAzureCredential in the Python SDK (#24212)
    
    * Added support for using DefaultAzureCredential() with Blob Storage.
    
    Tests not working yet due to TLS issue with Azurite.
    
    * SubjectAltNames are now included in the SSL cert.
    
    * DefaultAzureCredential is no longer used with AZURE_STORAGE_CONNECTION_STRING.
    
    * Fixed integration tests.
    
    * Can now run azure integration tests in parallel across all python versions.
    
    * Change pipeline option names to be consistent with Java SDK.
    
    * Fix missing Apache 2.0 licenses.
    
    Deleted cert and key files because they can't have a license header.
    Moved cert generation to azure_integration_test.sh.
    
    * Fix path to azure integration tests in tox comment.
    
    * Fix path to cert.pem.
    
    * Fix linting issues.
    
    * Fix path to cert.pem
    
    * Fix linter error.
    
    * Added azure integration tests to python 3.7 post-commit test suite.
    
    * Fix unit tests.
    
    * Updated README.md.
    
    * Move changes entry to 2.44.0.
---
 CHANGES.md                                         |  1 +
 build.gradle.kts                                   |  1 +
 sdks/python/apache_beam/internal/azure/__init__.py | 18 +++++
 sdks/python/apache_beam/internal/azure/auth.py     | 83 ++++++++++++++++++++++
 .../apache_beam/io/azure/blobstoragefilesystem.py  | 28 +++++---
 .../io/azure/blobstoragefilesystem_test.py         | 24 +++----
 sdks/python/apache_beam/io/azure/blobstorageio.py  | 17 ++++-
 .../io/azure/integration_test/Dockerfile           | 46 ++++++++++++
 .../integration_test/azure_integration_test.sh     | 79 ++++++++++++++++++++
 .../io/azure/integration_test/docker-compose.yml   | 50 +++++++++++++
 .../apache_beam/io/azure/integration_test/ssl.conf | 37 ++++++++++
 .../python/apache_beam/options/pipeline_options.py | 29 ++++++++
 sdks/python/setup.py                               |  1 +
 sdks/python/test-suites/direct/common.gradle       |  9 +++
 sdks/python/tox.ini                                | 37 ++++++++++
 15 files changed, 434 insertions(+), 26 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 414241d1c5b..2446bee91d1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -61,6 +61,7 @@
 * Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).
 * S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).
 * Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)).
+* Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)).
 
 ## New Features / Improvements
 
diff --git a/build.gradle.kts b/build.gradle.kts
index 2556db7bc9a..fe0956a6ea9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -340,6 +340,7 @@ tasks.register("python37PostCommit") {
   dependsOn(":sdks:python:test-suites:direct:py37:postCommitIT")
   dependsOn(":sdks:python:test-suites:direct:py37:directRunnerIT")
   dependsOn(":sdks:python:test-suites:direct:py37:hdfsIntegrationTest")
+  dependsOn(":sdks:python:test-suites:direct:py37:azureIntegrationTest")
   dependsOn(":sdks:python:test-suites:direct:py37:mongodbioIT")
   dependsOn(":sdks:python:test-suites:portable:py37:postCommitPy37")
   dependsOn(":sdks:python:test-suites:dataflow:py37:spannerioIT")
diff --git a/sdks/python/apache_beam/internal/azure/__init__.py b/sdks/python/apache_beam/internal/azure/__init__.py
new file mode 100644
index 00000000000..0bce5d68f72
--- /dev/null
+++ b/sdks/python/apache_beam/internal/azure/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""For internal use only; no backwards-compatibility guarantees."""
diff --git a/sdks/python/apache_beam/internal/azure/auth.py b/sdks/python/apache_beam/internal/azure/auth.py
new file mode 100644
index 00000000000..d505cc200f9
--- /dev/null
+++ b/sdks/python/apache_beam/internal/azure/auth.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Azure credentials and authentication."""
+
+# pytype: skip-file
+
+import logging
+import threading
+
+from apache_beam.options.pipeline_options import AzureOptions
+
+try:
+  from azure.identity import DefaultAzureCredential
+  _AZURE_AUTH_AVAILABLE = True
+except ImportError:
+  _AZURE_AUTH_AVAILABLE = False
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def get_service_credentials(pipeline_options):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Get credentials to access Azure services.
+  Args:
+    pipeline_options: Pipeline options, used in creating credentials
+      like managed identity credentials.
+
+  Returns:
+    A ``azure.identity.*Credential`` object or None if credentials
+    not found. Returned object is thread-safe.
+  """
+  return _Credentials.get_service_credentials(pipeline_options)
+
+
+class _Credentials(object):
+  _credentials_lock = threading.Lock()
+  _credentials_init = False
+  _credentials = None
+
+  @classmethod
+  def get_service_credentials(cls, pipeline_options):
+    with cls._credentials_lock:
+      if cls._credentials_init:
+        return cls._credentials
+      cls._credentials = cls._get_service_credentials(pipeline_options)
+      cls._credentials_init = True
+
+    return cls._credentials
+
+  @staticmethod
+  def _get_service_credentials(pipeline_options):
+    if not _AZURE_AUTH_AVAILABLE:
+      _LOGGER.warning(
+          'Unable to find default credentials because the azure.identity '
+          'library is not available. Install the azure.identity library to use '
+          'Azure default credentials.')
+      return None
+
+    try:
+      credentials = DefaultAzureCredential(
+        managed_identity_client_id=pipeline_options.view_as(AzureOptions)\
+          .azure_managed_identity_client_id)
+      _LOGGER.debug('Connecting using Azure Default Credentials.')
+      return credentials
+    except Exception as e:
+      _LOGGER.warning('Unable to find Azure credentials to use: %s\n', e)
+      return None
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index 7b86d592b28..cc678f9ae0f 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -37,6 +37,10 @@ class BlobStorageFileSystem(FileSystem):
   CHUNK_SIZE = blobstorageio.MAX_BATCH_OPERATION_SIZE
   AZURE_FILE_SYSTEM_PREFIX = 'azfs://'
 
+  def __init__(self, pipeline_options):
+    super().__init__(pipeline_options)
+    self._pipeline_options = pipeline_options
+
   @classmethod
   def scheme(cls):
     """URI scheme for the FileSystem
@@ -118,12 +122,15 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if listing fails, but not if no files were found.
     """
     try:
-      for path, (size, updated) in blobstorageio.BlobStorageIO() \
+      for path, (size, updated) in self._blobstorageIO() \
         .list_prefix(dir_or_prefix, with_metadata=True).items():
         yield FileMetadata(path, size, updated)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
+  def _blobstorageIO(self):
+    return blobstorageio.BlobStorageIO(pipeline_options=self._pipeline_options)
+
   def _path_open(
       self,
       path,
@@ -134,8 +141,7 @@ class BlobStorageFileSystem(FileSystem):
     """
     compression_type = FileSystem._get_compression_type(path, compression_type)
     mime_type = CompressionTypes.mime_type(compression_type, mime_type)
-    raw_file = blobstorageio.BlobStorageIO().open(
-        path, mode, mime_type=mime_type)
+    raw_file = self._blobstorageIO().open(path, mode, mime_type=mime_type)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
     return CompressedFile(raw_file, compression_type=compression_type)
@@ -190,7 +196,7 @@ class BlobStorageFileSystem(FileSystem):
       message = 'Unable to copy unequal number of sources and destinations.'
       raise BeamIOError(message)
     src_dest_pairs = list(zip(source_file_names, destination_file_names))
-    return blobstorageio.BlobStorageIO().copy_paths(src_dest_pairs)
+    return self._blobstorageIO().copy_paths(src_dest_pairs)
 
   def rename(self, source_file_names, destination_file_names):
     """Rename the files at the source list to the destination list.
@@ -207,7 +213,7 @@ class BlobStorageFileSystem(FileSystem):
       message = 'Unable to rename unequal number of sources and destinations.'
       raise BeamIOError(message)
     src_dest_pairs = list(zip(source_file_names, destination_file_names))
-    results = blobstorageio.BlobStorageIO().rename_files(src_dest_pairs)
+    results = self._blobstorageIO().rename_files(src_dest_pairs)
     # Retrieve exceptions.
     exceptions = {(src, dest): error
                   for (src, dest, error) in results if error is not None}
@@ -223,7 +229,7 @@ class BlobStorageFileSystem(FileSystem):
     Returns: boolean flag indicating if path exists
     """
     try:
-      return blobstorageio.BlobStorageIO().exists(path)
+      return self._blobstorageIO().exists(path)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Exists operation failed", {path: e})
 
@@ -239,7 +245,7 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if path doesn't exist.
     """
     try:
-      return blobstorageio.BlobStorageIO().size(path)
+      return self._blobstorageIO().size(path)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Size operation failed", {path: e})
 
@@ -255,7 +261,7 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if path doesn't exist.
     """
     try:
-      return blobstorageio.BlobStorageIO().last_updated(path)
+      return self._blobstorageIO().last_updated(path)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Last updated operation failed", {path: e})
 
@@ -272,7 +278,7 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if path isn't a file or doesn't exist.
     """
     try:
-      return blobstorageio.BlobStorageIO().checksum(path)
+      return self._blobstorageIO().checksum(path)
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Checksum operation failed", {path, e})
 
@@ -289,7 +295,7 @@ class BlobStorageFileSystem(FileSystem):
       ``BeamIOError``: if path isn't a file or doesn't exist.
     """
     try:
-      file_metadata = blobstorageio.BlobStorageIO()._status(path)
+      file_metadata = self._blobstorageIO()._status(path)
       return FileMetadata(
           path, file_metadata['size'], file_metadata['last_updated'])
     except Exception as e:  # pylint: disable=broad-except
@@ -305,7 +311,7 @@ class BlobStorageFileSystem(FileSystem):
     Raises:
       ``BeamIOError``: if any of the delete operations fail
     """
-    results = blobstorageio.BlobStorageIO().delete_paths(paths)
+    results = self._blobstorageIO().delete_paths(paths)
     # Retrieve exceptions.
     exceptions = {
         path: error
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index 241cc72a15d..6c844329867 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -88,7 +88,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     blobstorageio_mock.exists.return_value = True
     blobstorageio_mock._status.return_value = {
         'size': 1, 'last_updated': 99999.0
@@ -107,7 +107,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     blobstorageio_mock.list_prefix.return_value = {
         'azfs://storageaccount/container/file1': (1, 99999.0),
         'azfs://storageaccount/container/file2': (2, 88888.0)
@@ -128,7 +128,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     blobstorageio_mock = mock.MagicMock()
     limit = 1
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     blobstorageio_mock.list_prefix.return_value = {
         'azfs://storageaccount/container/file1': (1, 99999.0)
     }
@@ -146,7 +146,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     exception = IOError('Failed')
     blobstorageio_mock.list_prefix.side_effect = exception
 
@@ -165,7 +165,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     blobstorageio_mock.list_prefix.side_effect = [
         {
             'azfs://storageaccount/container/file1': (1, 99999.0)
@@ -189,7 +189,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     # Issue file copy.
     _ = self.fs.create(
         'azfs://storageaccount/container/file1', 'application/octet-stream')
@@ -204,7 +204,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     # Issue file copy.
     _ = self.fs.open(
         'azfs://storageaccount/container/file1', 'application/octet-stream')
@@ -219,7 +219,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     sources = [
         'azfs://storageaccount/container/from1',
         'azfs://storageaccount/container/from2',
@@ -240,7 +240,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     sources = [
         'azfs://storageaccount/container/from1',
         'azfs://storageaccount/container/from2',
@@ -260,7 +260,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     blobstorageio_mock.size.return_value = 0
     files = [
         'azfs://storageaccount/container/from1',
@@ -276,7 +276,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
     nonexistent_directory = 'azfs://storageaccount/nonexistent-container/tree/'
     exception = blobstorageio.BlobStorageError('Not found', 404)
 
@@ -307,7 +307,7 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     # Prepare mocks.
     blobstorageio_mock = mock.MagicMock()
     blobstoragefilesystem.blobstorageio.BlobStorageIO = \
-        lambda: blobstorageio_mock
+        lambda pipeline_options: blobstorageio_mock
 
     sources = [
         'azfs://storageaccount/container/original_blob1',
diff --git a/sdks/python/apache_beam/io/azure/blobstorageio.py b/sdks/python/apache_beam/io/azure/blobstorageio.py
index ae0a4944d52..c614ad64ab3 100644
--- a/sdks/python/apache_beam/io/azure/blobstorageio.py
+++ b/sdks/python/apache_beam/io/azure/blobstorageio.py
@@ -28,10 +28,12 @@ import re
 import tempfile
 import time
 
+from apache_beam.internal.azure import auth
 from apache_beam.io.filesystemio import Downloader
 from apache_beam.io.filesystemio import DownloaderStream
 from apache_beam.io.filesystemio import Uploader
 from apache_beam.io.filesystemio import UploaderStream
+from apache_beam.options.pipeline_options import AzureOptions
 from apache_beam.utils import retry
 
 _LOGGER = logging.getLogger(__name__)
@@ -105,10 +107,19 @@ class BlobStorageError(Exception):
 
 class BlobStorageIO(object):
   """Azure Blob Storage I/O client."""
-  def __init__(self, client=None):
-    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+  def __init__(self, client=None, pipeline_options=None):
     if client is None:
-      self.client = BlobServiceClient.from_connection_string(connect_str)
+      azure_options = pipeline_options.view_as(AzureOptions)
+      connect_str = azure_options.azure_connection_string or \
+                    os.getenv('AZURE_STORAGE_CONNECTION_STRING')
+      if connect_str:
+        self.client = BlobServiceClient.from_connection_string(
+            conn_str=connect_str)
+      else:
+        credential = auth.get_service_credentials(pipeline_options)
+        self.client = BlobServiceClient(
+            account_url=azure_options.blob_service_endpoint,
+            credential=credential)
     else:
       self.client = client
     if not AZURE_DEPS_INSTALLED:
diff --git a/sdks/python/apache_beam/io/azure/integration_test/Dockerfile b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile
new file mode 100644
index 00000000000..7ae67fb8500
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/Dockerfile
@@ -0,0 +1,46 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+# This image contains a Python SDK build and dependencies.
+# By default it runs wordcount against a locally accessible Azurite service.
+# See azure_integration_test.sh for example usage.
+ARG BASE_IMAGE
+FROM $BASE_IMAGE
+
+# Install Azure CLI.
+RUN curl -sL https://aka.ms/InstallAzureCLIDeb | bash
+
+WORKDIR /app
+
+# Add Beam SDK sources.
+COPY sdks/python /app/sdks/python
+COPY model /app/model
+
+# This step should look like setupVirtualenv minus virtualenv creation.
+RUN pip install --no-cache-dir tox==3.11.1 -r sdks/python/build-requirements.txt
+
+# Add Azurite's self-signed cert to the global CA cert store.
+COPY cert.pem /usr/local/share/ca-certificates/azurite.crt
+RUN update-ca-certificates
+
+# Tell python to use the global CA cert store.
+# See https://stackoverflow.com/a/66494735
+ENV REQUESTS_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt
+
+# Run wordcount, and write results to Azurite.
+CMD cd sdks/python && tox -e azure_integration_test
diff --git a/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh
new file mode 100755
index 00000000000..cfac5421093
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh
@@ -0,0 +1,79 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+# Runs Python Azure integration tests.
+#
+# Requires docker, docker-compose to be installed.
+
+# Usage check.
+if [[ $# != 1 ]]; then
+  printf "Usage: \n$> ./apache_beam/io/azure/integration_test/azure_integration_test.sh <python_version>"
+  printf "\n\tpython_version: [required] Python version used for container build and run tests."
+  printf " Use 'python:3.8' for Python3.8."
+  exit 1
+fi
+
+set -e -u -x
+
+PY_PREFIX=$(echo "$1" | tr ':.' '_')
+
+# Setup context directory.
+TEST_DIR=$(dirname $0)
+ROOT_DIR=$(cd "${TEST_DIR}/../../../../../.." && pwd)
+CONTEXT_DIR=${ROOT_DIR}/build/azure_integration_${PY_PREFIX}
+rm -rf ${CONTEXT_DIR} || true
+
+mkdir -p ${CONTEXT_DIR}/sdks
+cp -f ${TEST_DIR}/* ${CONTEXT_DIR}/
+cp -r ${ROOT_DIR}/sdks/python ${CONTEXT_DIR}/sdks/
+cp -r ${ROOT_DIR}/model ${CONTEXT_DIR}/
+
+# Use a unique name to allow concurrent runs on the same machine.
+PROJECT_NAME=$(echo azure_IT-$PY_PREFIX-${BUILD_TAG:-non-jenkins})
+
+if [ -z "${BUILD_TAG:-}" ]; then
+  COLOR_OPT=""
+else
+  COLOR_OPT="--no-ansi"
+fi
+COMPOSE_OPT="-p ${PROJECT_NAME} ${COLOR_OPT}"
+
+cd ${CONTEXT_DIR}
+
+# Generate a self-signed certificate for Azurite.
+openssl req -x509 -nodes -days 3650 -newkey rsa:2048 -keyout key.pem \
+  -out cert.pem -config ssl.conf -extensions 'v3_req'
+
+# Clean up leftover unused networks from previous runs. BEAM-4051
+# This might mess with leftover containers that still reference pruned networks,
+# so --force-recreate is passed to 'docker up' below.
+# https://github.com/docker/compose/issues/5745#issuecomment-370031631
+docker network prune --force
+
+# BEAM-7405: Create and point to an empty config file to work around "gcloud"
+# appearing in ~/.docker/config.json but not being installed.
+export DOCKER_CONFIG=.
+echo '{}' > config.json
+
+function finally {
+  time docker-compose ${COMPOSE_OPT} down
+}
+trap finally EXIT
+
+time docker-compose ${COMPOSE_OPT} build --build-arg BASE_IMAGE=$1
+time docker-compose ${COMPOSE_OPT} up --exit-code-from test \
+    --abort-on-container-exit --force-recreate
diff --git a/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml
new file mode 100644
index 00000000000..963b4a94c56
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/docker-compose.yml
@@ -0,0 +1,50 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+version: '3'
+
+services:
+  azurite:
+    image: mcr.microsoft.com/azure-storage/azurite:3.20.1
+    command:
+      - "azurite-blob"
+      - "--blobHost"
+      - "0.0.0.0"
+      - "--blobPort"
+      - "10000"
+      - "--oauth"
+      - "basic"
+      - "--cert"
+      - "/opt/azurite/certs/cert.pem"
+      - "--key"
+      - "/opt/azurite/certs/key.pem"
+    hostname: azurite
+    networks:
+      - azure_test_net
+    volumes:
+      - ./:/opt/azurite/certs
+
+  # Integration test.
+  test:
+    build: .
+    networks:
+      - azure_test_net
+    depends_on:
+      - "azurite"
+
+networks:
+  azure_test_net:
\ No newline at end of file
diff --git a/sdks/python/apache_beam/io/azure/integration_test/ssl.conf b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf
new file mode 100644
index 00000000000..75128200ae8
--- /dev/null
+++ b/sdks/python/apache_beam/io/azure/integration_test/ssl.conf
@@ -0,0 +1,37 @@
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+[ req ]
+default_bits       = 2048
+distinguished_name = req_distinguished_name
+x509_extensions    = v3_req
+prompt             = no
+
+[ req_distinguished_name ]
+countryName                 = CO
+stateOrProvinceName         = ST
+localityName                = LO
+organizationName            = OU
+commonName                  = CN
+
+[ v3_req ]
+keyUsage = keyEncipherment, dataEncipherment
+extendedKeyUsage = serverAuth
+subjectAltName = @alt_names
+
+[ alt_names ]
+DNS.1   = 127.0.0.1
+DNS.2   = azurite
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index b51a3c93bb8..02d8b02c18e 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -42,6 +42,7 @@ __all__ = [
     'TypeOptions',
     'DirectOptions',
     'GoogleCloudOptions',
+    'AzureOptions',
     'HadoopFileSystemOptions',
     'WorkerOptions',
     'DebugOptions',
@@ -861,6 +862,34 @@ class GoogleCloudOptions(PipelineOptions):
     return errors
 
 
+class AzureOptions(PipelineOptions):
+  """Azure Blob Storage options."""
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--azure_connection_string',
+        default=None,
+        help='Connection string of the Azure Blob Storage Account.')
+    parser.add_argument(
+        '--blob_service_endpoint',
+        default=None,
+        help='URL of the Azure Blob Storage Account.')
+    parser.add_argument(
+        '--azure_managed_identity_client_id',
+        default=None,
+        help='Client ID of a user-assigned managed identity.')
+
+  def validate(self, validator):
+    errors = []
+    if self.azure_connection_string:
+      if self.blob_service_endpoint:
+        errors.append(
+            '--azure_connection_string and '
+            '--blob_service_endpoint are mutually exclusive.')
+
+    return errors
+
+
 class HadoopFileSystemOptions(PipelineOptions):
   """``HadoopFileSystem`` connection options."""
   @classmethod
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ba0c2f38eec..2c30414d931 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -343,6 +343,7 @@ if __name__ == '__main__':
           'azure': [
             'azure-storage-blob >=12.3.2',
             'azure-core >=1.7.0',
+            'azure-identity >=1.12.0',
           ],
         #(TODO): Some tests using Pandas implicitly calls inspect.stack()
         # with python 3.10 leading to incorrect stacktrace.
diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle
index cd23645db15..c8db6388305 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -188,6 +188,15 @@ tasks.register("hdfsIntegrationTest") {
   }
 }
 
+tasks.register("azureIntegrationTest") {
+  doLast {
+    exec {
+      executable 'sh'
+      args '-c', "${rootDir}/sdks/python/apache_beam/io/azure/integration_test/azure_integration_test.sh python:${pythonContainerVersion}"
+    }
+  }
+}
+
 // Pytorch RunInference IT tests
 task torchInferenceTest {
   dependsOn 'installGcpTest'
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 33ec39c4189..76d26c271e2 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -183,6 +183,43 @@ commands =
 commands_pre =
   pip check
 
+[testenv:azure_integration_test]
+# Used by azure/integration_test/azure_integration_test.sh.
+# Do not run this directly, as it depends on nodes defined in
+# azure/integration_test/docker-compose.yml.
+deps =
+  -r build-requirements.txt
+extras =
+  azure
+whitelist_externals =
+  echo
+  sleep
+passenv = REQUESTS_CA_BUNDLE
+setenv =
+  CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=https://azurite:10000/devstoreaccount1;
+commands_pre =
+  pip check
+  wget storage.googleapis.com/dataflow-samples/shakespeare/kinglear.txt
+  # Create container for storing files.
+  az storage container create -n container --connection-string {env:CONNECTION_STRING}
+  # Upload test file.
+  az storage blob upload -f kinglear.txt -c container -n kinglear.txt --connection-string {env:CONNECTION_STRING}
+commands =
+  # Test --azure_connection_string
+  python -m apache_beam.examples.wordcount \
+      --input azfs://devstoreaccount1/container/* \
+      --output azfs://devstoreaccount1/container/py-wordcount-integration \
+      --azure_connection_string {env:CONNECTION_STRING}
+  # This doesn't work because there's no way to send a fake bearer token to
+  # Azurite when using DefaultAzureCredential.
+  # See https://github.com/Azure/Azurite/issues/389#issuecomment-615298432
+  # and https://github.com/Azure/Azurite/issues/1750#issue-1449778593
+  #python -m apache_beam.examples.wordcount \
+  #    --input azfs://devstoreaccount1/container/* \
+  #    --output azfs://devstoreaccount1/container/py-wordcount-integration \
+  #    --blob_service_endpoint https://azurite:10000/devstoreaccount1/container-name \
+  #    --azure_managed_identity_client_id "abc123"
+
 [testenv:py3-yapf]
 # keep the version of yapf in sync with the 'rev' in .pre-commit-config.yaml
 deps =