You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2023/01/05 14:03:47 UTC
[airflow] branch main updated: Add hook for Azure Data Lake Storage Gen2 (#28262)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ad7f8e09f8 Add hook for Azure Data Lake Storage Gen2 (#28262)
ad7f8e09f8 is described below
commit ad7f8e09f8e6e87df2665abdedb22b3e8a469b49
Author: Bharanidharan <94...@users.noreply.github.com>
AuthorDate: Thu Jan 5 19:33:27 2023 +0530
Add hook for Azure Data Lake Storage Gen2 (#28262)
Created hook for supporting ADLS gen2, which uses the WASB connection and connects to ADLS gen2 storage
Relates to #28223
---
.../providers/microsoft/azure/hooks/data_lake.py | 311 ++++++++++++++++++++-
airflow/providers/microsoft/azure/provider.yaml | 10 +
.../connections/adls_v2.rst | 68 +++++
generated/provider_dependencies.json | 1 +
.../microsoft/azure/hooks/test_azure_data_lake.py | 96 ++++++-
5 files changed, 477 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py
index 4a9bc98f92..ad5e9818a9 100644
--- a/airflow/providers/microsoft/azure/hooks/data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/data_lake.py
@@ -15,19 +15,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module contains integration with Azure Data Lake.
-
-AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
-Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a
-login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)
-(see connection `azure_data_lake_default` for an example).
-"""
from __future__ import annotations
from typing import Any
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
from azure.datalake.store import core, lib, multithread
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+ DataLakeDirectoryClient,
+ DataLakeFileClient,
+ DataLakeServiceClient,
+ DirectoryProperties,
+ FileSystemClient,
+ FileSystemProperties,
+)
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
@@ -36,6 +38,13 @@ from airflow.providers.microsoft.azure.utils import _ensure_prefixes, get_field
class AzureDataLakeHook(BaseHook):
"""
+ This module contains integration with Azure Data Lake.
+
+ AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
+ Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a
+ login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name
+ (Account Name)(see connection `azure_data_lake_default` for an example).
+
Interacts with Azure Data Lake.
Client ID and client secret should be in user and password parameters.
@@ -230,3 +239,289 @@ class AzureDataLakeHook(BaseHook):
self.log.info("File %s not found", path)
else:
raise AirflowException(f"File {path} not found")
+
+
+class AzureDataLakeStorageV2Hook(BaseHook):
+ """
+ This Hook interacts with ADLS gen2 storage account it mainly helps to create and manage
+ directories and files in storage accounts that have a hierarchical namespace. Using Adls_v2 connection
+ details create DataLakeServiceClient object
+
+ Due to Wasb is marked as legacy and and retirement of the (ADLS1) it would be nice to
+ implement ADLS gen2 hook for interacting with the storage account.
+
+ .. seealso::
+ https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python
+
+ :param adls_conn_id: Reference to the :ref:`adls connection <howto/connection:adls>`.
+ :param public_read: Whether an anonymous public read access should be used. default is False
+ """
+
+ conn_name_attr = "adls_conn_id"
+ default_conn_name = "adls_default"
+ conn_type = "adls"
+ hook_name = "Azure Date Lake Storage V2"
+
+ @staticmethod
+ def get_connection_form_widgets() -> dict[str, Any]:
+ """Returns connection widgets to add to connection form"""
+ from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import PasswordField, StringField
+
+ return {
+ "connection_string": PasswordField(
+ lazy_gettext("ADLS gen2 Connection String (optional)"), widget=BS3PasswordFieldWidget()
+ ),
+ "tenant_id": StringField(
+ lazy_gettext("Tenant Id (Active Directory Auth)"), widget=BS3TextFieldWidget()
+ ),
+ }
+
+ @staticmethod
+ def get_ui_field_behaviour() -> dict[str, Any]:
+ """Returns custom field behaviour"""
+ return {
+ "hidden_fields": ["schema", "port"],
+ "relabeling": {
+ "login": "ADLS gen2 Storage Login (optional)",
+ "password": "ADLS gen2 Storage Key (optional)",
+ "host": "Account Name (Active Directory Auth)",
+ },
+ "placeholders": {
+ "extra": "additional options for use with FileService and AzureFileVolume",
+ "login": "account name",
+ "password": "secret",
+ "host": "account url",
+ "connection_string": "connection string auth",
+ "tenant_id": "tenant",
+ },
+ }
+
+ def __init__(self, adls_conn_id: str, public_read: bool = False) -> None:
+ super().__init__()
+ self.conn_id = adls_conn_id
+ self.public_read = public_read
+ self.service_client = self.get_conn()
+
+ def get_conn(self) -> DataLakeServiceClient: # type: ignore[override]
+ """Return the DataLakeServiceClient object."""
+ conn = self.get_connection(self.conn_id)
+ extra = conn.extra_dejson or {}
+
+ connection_string = self._get_field(extra, "connection_string")
+ if connection_string:
+ # connection_string auth takes priority
+ return DataLakeServiceClient.from_connection_string(connection_string, **extra)
+
+ tenant = self._get_field(extra, "tenant_id")
+ if tenant:
+ # use Active Directory auth
+ app_id = conn.login
+ app_secret = conn.password
+ token_credential = ClientSecretCredential(tenant, app_id, app_secret)
+ return DataLakeServiceClient(
+ account_url=f"https://{conn.login}.dfs.core.windows.net", credential=token_credential, **extra
+ )
+ credential = conn.password
+ return DataLakeServiceClient(
+ account_url=f"https://{conn.login}.dfs.core.windows.net",
+ credential=credential,
+ **extra,
+ )
+
+ def _get_field(self, extra_dict, field_name):
+ prefix = "extra__adls__"
+ if field_name.startswith("extra__"):
+ raise ValueError(
+ f"Got prefixed name {field_name}; please remove the '{prefix}' prefix "
+ f"when using this method."
+ )
+ if field_name in extra_dict:
+ return extra_dict[field_name] or None
+ return extra_dict.get(f"{prefix}{field_name}") or None
+
+ def create_file_system(self, file_system_name: str) -> None:
+ """
+ A container acts as a file system for your files. Creates a new file system under
+ the specified account.
+
+ If the file system with the same name already exists, a ResourceExistsError will
+ be raised. This method returns a client with which to interact with the newly
+ created file system.
+ """
+ try:
+ file_system_client = self.service_client.create_file_system(file_system=file_system_name)
+ self.log.info("Created file system: %s", file_system_client.file_system_name)
+ except ResourceExistsError:
+ self.log.info("Attempted to create file system %r but it already exists.", file_system_name)
+ except Exception as e:
+ self.log.info("Error while attempting to create file system %r: %s", file_system_name, e)
+ raise
+
+ def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:
+ """
+ Get a client to interact with the specified file system
+
+ :param file_system: This can either be the name of the file system
+ or an instance of FileSystemProperties.
+ """
+ try:
+ file_system_client = self.service_client.get_file_system_client(file_system=file_system)
+ return file_system_client
+ except ResourceNotFoundError:
+ self.log.info("file system %r doesn't exists.", file_system)
+ raise
+ except Exception as e:
+ self.log.info("Error while attempting to get file system %r: %s", file_system, e)
+ raise
+
+ def create_directory(
+ self, file_system_name: FileSystemProperties | str, directory_name: str, **kwargs
+ ) -> DataLakeDirectoryClient:
+ """
+ Create a directory under the specified file system.
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param directory_name: Name of the directory which needs to be created in the file system.
+ """
+ result = self.get_file_system(file_system_name).create_directory(directory_name, kwargs)
+ return result
+
+ def get_directory_client(
+ self,
+ file_system_name: FileSystemProperties | str,
+ directory_name: DirectoryProperties | str,
+ ) -> DataLakeDirectoryClient:
+ """
+ Get the specific directory under the specified file system.
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param directory_name: Name of the directory or instance of DirectoryProperties which needs to be
+ retrieved from the file system.
+ """
+ try:
+ directory_client = self.get_file_system(file_system_name).get_directory_client(directory_name)
+ return directory_client
+ except ResourceNotFoundError:
+ self.log.info(
+ "Directory %s doesn't exists in the file system %s", directory_name, file_system_name
+ )
+ raise
+ except Exception as e:
+ self.log.info(e)
+ raise
+
+ def create_file(self, file_system_name: FileSystemProperties | str, file_name: str) -> DataLakeFileClient:
+ """
+ Creates a file under the file system
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param file_name: Name of the file which needs to be created in the file system.
+ """
+ file_client = self.get_file_system(file_system_name).create_file(file_name)
+ return file_client
+
+ def upload_file(
+ self,
+ file_system_name: FileSystemProperties | str,
+ file_name: str,
+ file_path: str,
+ overwrite: bool = False,
+ **kwargs: Any,
+ ) -> None:
+ """
+ Create a file with data in the file system
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param file_name: Name of the file to be created with name.
+ :param file_path: Path to the file to load.
+ :param overwrite: Boolean flag to overwrite an existing file or not.
+ """
+ file_client = self.create_file(file_system_name, file_name)
+ with open(file_path, "rb") as data:
+ file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs)
+
+ def upload_file_to_directory(
+ self,
+ file_system_name: str,
+ directory_name: str,
+ file_name: str,
+ file_path: str,
+ overwrite: bool = False,
+ **kwargs: Any,
+ ) -> None:
+ """
+ Create a new file and return the file client to be interacted with and then
+ upload data to a file
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param directory_name: Name of the directory.
+ :param file_name: Name of the file to be created with name.
+ :param file_path: Path to the file to load.
+ :param overwrite: Boolean flag to overwrite an existing file or not.
+ """
+ directory_client = self.get_directory_client(file_system_name, directory_name=directory_name)
+ file_client = directory_client.create_file(file_name, kwargs=kwargs)
+ with open(file_path, "rb") as data:
+ file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs)
+
+ def list_files_directory(
+ self, file_system_name: FileSystemProperties | str, directory_name: str
+ ) -> list[str]:
+ """
+ Get the list of files or directories under the specified file system
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param directory_name: Name of the directory.
+ """
+ paths = self.get_file_system(file_system=file_system_name).get_paths(directory_name)
+ directory_lists = []
+ for path in paths:
+ directory_lists.append(path.name)
+ return directory_lists
+
+ def list_file_system(
+ self, prefix: str | None = None, include_metadata: bool = False, **kwargs: Any
+ ) -> list[str]:
+ """
+ Get the list the file systems under the specified account.
+
+ :param prefix:
+ Filters the results to return only file systems whose names
+ begin with the specified prefix.
+ :param include_metadata: Specifies that file system metadata be returned in the response.
+ The default value is `False`.
+ """
+ file_system = self.service_client.list_file_systems(
+ name_starts_with=prefix, include_metadata=include_metadata
+ )
+ file_system_list = []
+ for fs in file_system:
+ file_system_list.append(fs.name)
+ return file_system_list
+
+ def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:
+ """
+ Deletes the file system
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ """
+ try:
+ self.service_client.delete_file_system(file_system_name)
+ self.log.info("Deleted file system: %s", file_system_name)
+ except ResourceNotFoundError:
+ self.log.info("file system %r doesn't exists.", file_system_name)
+ except Exception as e:
+ self.log.info("Error while attempting to deleting file system %r: %s", file_system_name, e)
+ raise
+
+ def delete_directory(self, file_system_name: FileSystemProperties | str, directory_name: str) -> None:
+ """
+ Deletes specified directory in file system
+
+ :param file_system_name: Name of the file system or instance of FileSystemProperties.
+ :param directory_name: Name of the directory.
+ """
+ directory_client = self.get_directory_client(file_system_name, directory_name)
+ directory_client.delete_directory()
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index 4962863668..e3af9984c6 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -67,6 +67,7 @@ dependencies:
- azure-servicebus>=7.6.1
- azure-synapse-spark
- adal>=1.2.7
+ - azure-storage-file-datalake>=12.9.1
integrations:
- integration-name: Microsoft Azure Batch
@@ -124,6 +125,10 @@ integrations:
how-to-guide:
- /docs/apache-airflow-providers-microsoft-azure/operators/azure_synapse.rst
tags: [azure]
+ - integration-name: Microsoft Azure Data Lake Storage Client Gen2
+ external-doc-url: https://azure.microsoft.com/en-us/products/storage/data-lake-storage/
+ logo: /integration-logos/azure/Data Lake Storage.svg
+ tags: [azure]
operators:
- integration-name: Microsoft Azure Data Lake Storage
@@ -198,6 +203,9 @@ hooks:
- integration-name: Microsoft Azure Service Bus
python-modules:
- airflow.providers.microsoft.azure.hooks.asb
+ - integration-name: Microsoft Azure Data Lake Storage Client Gen2
+ python-modules:
+ - airflow.providers.microsoft.azure.hooks.data_lake
- integration-name: Microsoft Azure Synapse
python-modules:
- airflow.providers.microsoft.azure.hooks.synapse
@@ -252,6 +260,8 @@ connection-types:
connection-type: azure_service_bus
- hook-class-name: airflow.providers.microsoft.azure.hooks.synapse.AzureSynapseHook
connection-type: azure_synapse
+ - hook-class-name: airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook
+ connection-type: adls
secrets-backends:
- airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend
diff --git a/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst
new file mode 100644
index 0000000000..9ec4015679
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/connection:adls:
+
+Microsoft Azure Data Lake Storage Gen2 Connection
+==================================================
+
+The Microsoft Azure Data Lake Storage Gen2 connection type enables the ADLS gen2 Integrations.
+
+Authenticating to Azure Data Lake Storage Gen2
+----------------------------------------------
+
+Currently, there are two ways to connect to Azure Data Lake Storage Gen2 using Airflow.
+
+1. Use `token credentials
+ <https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_
+ i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection.
+2. Use a `Connection String
+ <https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_
+ i.e. add connection string to ``connection_string`` in the Airflow connection.
+
+Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should
+configure multiple connections.
+
+Default Connection IDs
+----------------------
+
+All hooks and operators related to Microsoft Azure Blob Storage use ``azure_data_lake_default`` by default.
+
+Configuring the Connection
+--------------------------
+
+Login (optional)
+ Specify the login used for azure blob storage. For use with Shared Key Credential and SAS Token authentication.
+
+Password (optional)
+ Specify the password used for azure blob storage. For use with
+ Active Directory (token credential) and shared key authentication.
+
+Host (optional)
+ Specify the account url for anonymous public read, Active Directory, shared access key authentication.
+
+Extra (optional)
+ Specify the extra parameters (as json dictionary) that can be used in Azure connection.
+ The following parameters are all optional:
+
+ * ``tenant_id``: Specify the tenant to use. Needed for Active Directory (token) authentication.
+ * ``connection_string``: Connection string for use with connection string authentication.
+
+When specifying the connection in environment variable you should specify
+it using URI syntax.
+
+Note that all components of the URI should be URL-encoded.
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index f814543523..d7729e1a4b 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -452,6 +452,7 @@
"azure-servicebus>=7.6.1",
"azure-storage-blob>=12.14.0",
"azure-storage-common>=2.1.0",
+ "azure-storage-file-datalake>=12.9.1",
"azure-storage-file>=2.1.0",
"azure-synapse-spark"
],
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
index f2b2d5a654..e8d378ab5c 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
@@ -21,7 +21,7 @@ import json
from unittest import mock
from airflow.models import Connection
-from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
+from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook
from airflow.utils import db
from tests.test_utils.providers import get_provider_min_airflow_version
@@ -151,3 +151,97 @@ class TestAzureDataLakeHook:
"You must now remove `_ensure_prefixes` from azure utils."
" The functionality is now taken care of by providers manager."
)
+
+
+class TestAzureDataLakeStorageV2Hook:
+ def setup_class(self) -> None:
+ self.conn_id: str = "adls_conn_id"
+ self.file_system_name = "test_file_system"
+ self.directory_name = "test_directory"
+ self.file_name = "test_file_name"
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_create_file_system(self, mock_conn):
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ hook.create_file_system("test_file_system")
+ expected_calls = [mock.call().create_file_system(file_system=self.file_system_name)]
+ mock_conn.assert_has_calls(expected_calls)
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.FileSystemClient")
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_get_file_system(self, mock_conn, mock_file_system):
+ mock_conn.return_value.get_file_system_client.return_value = mock_file_system
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ result = hook.get_file_system(self.file_system_name)
+ assert result == mock_file_system
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+ @mock.patch(
+ "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+ )
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_create_directory(self, mock_conn, mock_get_file_system, mock_directory_client):
+ mock_get_file_system.return_value.create_directory.return_value = mock_directory_client
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ result = hook.create_directory(self.file_system_name, self.directory_name)
+ assert result == mock_directory_client
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+ @mock.patch(
+ "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+ )
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_get_directory(self, mock_conn, mock_get_file_system, mock_directory_client):
+ mock_get_file_system.return_value.get_directory_client.return_value = mock_directory_client
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ result = hook.get_directory_client(self.file_system_name, self.directory_name)
+ assert result == mock_directory_client
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeFileClient")
+ @mock.patch(
+ "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+ )
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_create_file(self, mock_conn, mock_get_file_system, mock_file_client):
+ mock_get_file_system.return_value.create_file.return_value = mock_file_client
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ result = hook.create_file(self.file_system_name, self.file_name)
+ assert result == mock_file_client
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_delete_file_system(self, mock_conn):
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ hook.delete_file_system(self.file_system_name)
+ expected_calls = [mock.call().delete_file_system(self.file_system_name)]
+ mock_conn.assert_has_calls(expected_calls)
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_delete_directory(self, mock_conn, mock_directory_client):
+ mock_conn.return_value.get_directory_client.return_value = mock_directory_client
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ hook.delete_directory(self.file_system_name, self.directory_name)
+ expected_calls = [
+ mock.call()
+ .get_file_system_client(self.file_system_name)
+ .get_directory_client(self.directory_name)
+ .delete_directory()
+ ]
+ mock_conn.assert_has_calls(expected_calls)
+
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_list_file_system(self, mock_conn):
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ hook.list_file_system(prefix="prefix")
+ mock_conn.return_value.list_file_systems.assert_called_once_with(
+ name_starts_with="prefix", include_metadata=False
+ )
+
+ @mock.patch(
+ "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+ )
+ @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+ def test_list_files_directory(self, mock_conn, mock_get_file_system):
+ hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+ hook.list_files_directory(self.file_system_name, self.directory_name)
+ mock_get_file_system.return_value.get_paths.assert_called_once_with(self.directory_name)