You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2023/01/05 14:03:47 UTC

[airflow] branch main updated: Add hook for Azure Data Lake Storage Gen2 (#28262)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ad7f8e09f8 Add hook for Azure Data Lake Storage Gen2 (#28262)
ad7f8e09f8 is described below

commit ad7f8e09f8e6e87df2665abdedb22b3e8a469b49
Author: Bharanidharan <94...@users.noreply.github.com>
AuthorDate: Thu Jan 5 19:33:27 2023 +0530

    Add hook for Azure Data Lake Storage Gen2 (#28262)
    
    Created hook for supporting ADLS gen2, which uses the WASB connection and connects to ADLS gen2 storage
    
    Relates to #28223
---
 .../providers/microsoft/azure/hooks/data_lake.py   | 311 ++++++++++++++++++++-
 airflow/providers/microsoft/azure/provider.yaml    |  10 +
 .../connections/adls_v2.rst                        |  68 +++++
 generated/provider_dependencies.json               |   1 +
 .../microsoft/azure/hooks/test_azure_data_lake.py  |  96 ++++++-
 5 files changed, 477 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/microsoft/azure/hooks/data_lake.py b/airflow/providers/microsoft/azure/hooks/data_lake.py
index 4a9bc98f92..ad5e9818a9 100644
--- a/airflow/providers/microsoft/azure/hooks/data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/data_lake.py
@@ -15,19 +15,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module contains integration with Azure Data Lake.
-
-AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
-Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a
-login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)
-(see connection `azure_data_lake_default` for an example).
-"""
 from __future__ import annotations
 
 from typing import Any
 
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
 from azure.datalake.store import core, lib, multithread
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+    DataLakeDirectoryClient,
+    DataLakeFileClient,
+    DataLakeServiceClient,
+    DirectoryProperties,
+    FileSystemClient,
+    FileSystemProperties,
+)
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
@@ -36,6 +38,13 @@ from airflow.providers.microsoft.azure.utils import _ensure_prefixes, get_field
 
 class AzureDataLakeHook(BaseHook):
     """
+    This module contains integration with Azure Data Lake.
+
+    AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a
+    Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a
+    login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name
+    (Account Name)(see connection `azure_data_lake_default` for an example).
+
     Interacts with Azure Data Lake.
 
     Client ID and client secret should be in user and password parameters.
@@ -230,3 +239,289 @@ class AzureDataLakeHook(BaseHook):
                 self.log.info("File %s not found", path)
             else:
                 raise AirflowException(f"File {path} not found")
+
+
+class AzureDataLakeStorageV2Hook(BaseHook):
+    """
+    This Hook interacts with ADLS gen2 storage account it mainly helps to create and manage
+    directories and files in storage accounts that have a hierarchical namespace. Using Adls_v2 connection
+    details create DataLakeServiceClient object
+
+    Due to Wasb is marked as legacy and and retirement of the (ADLS1) it would be nice to
+    implement ADLS gen2 hook for interacting with the storage account.
+
+    .. seealso::
+        https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python
+
+    :param adls_conn_id: Reference to the :ref:`adls connection <howto/connection:adls>`.
+    :param public_read: Whether an anonymous public read access should be used. default is False
+    """
+
+    conn_name_attr = "adls_conn_id"
+    default_conn_name = "adls_default"
+    conn_type = "adls"
+    hook_name = "Azure Date Lake Storage V2"
+
+    @staticmethod
+    def get_connection_form_widgets() -> dict[str, Any]:
+        """Returns connection widgets to add to connection form"""
+        from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget
+        from flask_babel import lazy_gettext
+        from wtforms import PasswordField, StringField
+
+        return {
+            "connection_string": PasswordField(
+                lazy_gettext("ADLS gen2 Connection String (optional)"), widget=BS3PasswordFieldWidget()
+            ),
+            "tenant_id": StringField(
+                lazy_gettext("Tenant Id (Active Directory Auth)"), widget=BS3TextFieldWidget()
+            ),
+        }
+
+    @staticmethod
+    def get_ui_field_behaviour() -> dict[str, Any]:
+        """Returns custom field behaviour"""
+        return {
+            "hidden_fields": ["schema", "port"],
+            "relabeling": {
+                "login": "ADLS gen2 Storage Login (optional)",
+                "password": "ADLS gen2 Storage Key (optional)",
+                "host": "Account Name (Active Directory Auth)",
+            },
+            "placeholders": {
+                "extra": "additional options for use with FileService and AzureFileVolume",
+                "login": "account name",
+                "password": "secret",
+                "host": "account url",
+                "connection_string": "connection string auth",
+                "tenant_id": "tenant",
+            },
+        }
+
+    def __init__(self, adls_conn_id: str, public_read: bool = False) -> None:
+        super().__init__()
+        self.conn_id = adls_conn_id
+        self.public_read = public_read
+        self.service_client = self.get_conn()
+
+    def get_conn(self) -> DataLakeServiceClient:  # type: ignore[override]
+        """Return the DataLakeServiceClient object."""
+        conn = self.get_connection(self.conn_id)
+        extra = conn.extra_dejson or {}
+
+        connection_string = self._get_field(extra, "connection_string")
+        if connection_string:
+            # connection_string auth takes priority
+            return DataLakeServiceClient.from_connection_string(connection_string, **extra)
+
+        tenant = self._get_field(extra, "tenant_id")
+        if tenant:
+            # use Active Directory auth
+            app_id = conn.login
+            app_secret = conn.password
+            token_credential = ClientSecretCredential(tenant, app_id, app_secret)
+            return DataLakeServiceClient(
+                account_url=f"https://{conn.login}.dfs.core.windows.net", credential=token_credential, **extra
+            )
+        credential = conn.password
+        return DataLakeServiceClient(
+            account_url=f"https://{conn.login}.dfs.core.windows.net",
+            credential=credential,
+            **extra,
+        )
+
+    def _get_field(self, extra_dict, field_name):
+        prefix = "extra__adls__"
+        if field_name.startswith("extra__"):
+            raise ValueError(
+                f"Got prefixed name {field_name}; please remove the '{prefix}' prefix "
+                f"when using this method."
+            )
+        if field_name in extra_dict:
+            return extra_dict[field_name] or None
+        return extra_dict.get(f"{prefix}{field_name}") or None
+
+    def create_file_system(self, file_system_name: str) -> None:
+        """
+        A container acts as a file system for your files. Creates a new file system under
+        the specified account.
+
+        If the file system with the same name already exists, a ResourceExistsError will
+        be raised. This method returns a client with which to interact with the newly
+        created file system.
+        """
+        try:
+            file_system_client = self.service_client.create_file_system(file_system=file_system_name)
+            self.log.info("Created file system: %s", file_system_client.file_system_name)
+        except ResourceExistsError:
+            self.log.info("Attempted to create file system %r but it already exists.", file_system_name)
+        except Exception as e:
+            self.log.info("Error while attempting to create file system %r: %s", file_system_name, e)
+            raise
+
+    def get_file_system(self, file_system: FileSystemProperties | str) -> FileSystemClient:
+        """
+        Get a client to interact with the specified file system
+
+        :param file_system: This can either be the name of the file system
+            or an instance of FileSystemProperties.
+        """
+        try:
+            file_system_client = self.service_client.get_file_system_client(file_system=file_system)
+            return file_system_client
+        except ResourceNotFoundError:
+            self.log.info("file system %r doesn't exists.", file_system)
+            raise
+        except Exception as e:
+            self.log.info("Error while attempting to get file system %r: %s", file_system, e)
+            raise
+
+    def create_directory(
+        self, file_system_name: FileSystemProperties | str, directory_name: str, **kwargs
+    ) -> DataLakeDirectoryClient:
+        """
+        Create a directory under the specified file system.
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param directory_name: Name of the directory which needs to be created in the file system.
+        """
+        result = self.get_file_system(file_system_name).create_directory(directory_name, kwargs)
+        return result
+
+    def get_directory_client(
+        self,
+        file_system_name: FileSystemProperties | str,
+        directory_name: DirectoryProperties | str,
+    ) -> DataLakeDirectoryClient:
+        """
+        Get the specific directory under the specified file system.
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param directory_name: Name of the directory or instance of DirectoryProperties which needs to be
+            retrieved from the file system.
+        """
+        try:
+            directory_client = self.get_file_system(file_system_name).get_directory_client(directory_name)
+            return directory_client
+        except ResourceNotFoundError:
+            self.log.info(
+                "Directory %s doesn't exists in the file system %s", directory_name, file_system_name
+            )
+            raise
+        except Exception as e:
+            self.log.info(e)
+            raise
+
+    def create_file(self, file_system_name: FileSystemProperties | str, file_name: str) -> DataLakeFileClient:
+        """
+        Creates a file under the file system
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param file_name: Name of the file which needs to be created in the file system.
+        """
+        file_client = self.get_file_system(file_system_name).create_file(file_name)
+        return file_client
+
+    def upload_file(
+        self,
+        file_system_name: FileSystemProperties | str,
+        file_name: str,
+        file_path: str,
+        overwrite: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        """
+        Create a file with data in the file system
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param file_name: Name of the file to be created with name.
+        :param file_path: Path to the file to load.
+        :param overwrite: Boolean flag to overwrite an existing file or not.
+        """
+        file_client = self.create_file(file_system_name, file_name)
+        with open(file_path, "rb") as data:
+            file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs)
+
+    def upload_file_to_directory(
+        self,
+        file_system_name: str,
+        directory_name: str,
+        file_name: str,
+        file_path: str,
+        overwrite: bool = False,
+        **kwargs: Any,
+    ) -> None:
+        """
+        Create a new file and return the file client to be interacted with and then
+        upload data to a file
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param directory_name: Name of the directory.
+        :param file_name: Name of the file to be created with name.
+        :param file_path: Path to the file to load.
+        :param overwrite: Boolean flag to overwrite an existing file or not.
+        """
+        directory_client = self.get_directory_client(file_system_name, directory_name=directory_name)
+        file_client = directory_client.create_file(file_name, kwargs=kwargs)
+        with open(file_path, "rb") as data:
+            file_client.upload_data(data, overwrite=overwrite, kwargs=kwargs)
+
+    def list_files_directory(
+        self, file_system_name: FileSystemProperties | str, directory_name: str
+    ) -> list[str]:
+        """
+        Get the list of files or directories under the specified file system
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param directory_name: Name of the directory.
+        """
+        paths = self.get_file_system(file_system=file_system_name).get_paths(directory_name)
+        directory_lists = []
+        for path in paths:
+            directory_lists.append(path.name)
+        return directory_lists
+
+    def list_file_system(
+        self, prefix: str | None = None, include_metadata: bool = False, **kwargs: Any
+    ) -> list[str]:
+        """
+        Get the list the file systems under the specified account.
+
+        :param prefix:
+            Filters the results to return only file systems whose names
+            begin with the specified prefix.
+        :param include_metadata: Specifies that file system metadata be returned in the response.
+            The default value is `False`.
+        """
+        file_system = self.service_client.list_file_systems(
+            name_starts_with=prefix, include_metadata=include_metadata
+        )
+        file_system_list = []
+        for fs in file_system:
+            file_system_list.append(fs.name)
+        return file_system_list
+
+    def delete_file_system(self, file_system_name: FileSystemProperties | str) -> None:
+        """
+        Deletes the file system
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        """
+        try:
+            self.service_client.delete_file_system(file_system_name)
+            self.log.info("Deleted file system: %s", file_system_name)
+        except ResourceNotFoundError:
+            self.log.info("file system %r doesn't exists.", file_system_name)
+        except Exception as e:
+            self.log.info("Error while attempting to deleting file system %r: %s", file_system_name, e)
+            raise
+
+    def delete_directory(self, file_system_name: FileSystemProperties | str, directory_name: str) -> None:
+        """
+        Deletes specified directory in file system
+
+        :param file_system_name: Name of the file system or instance of FileSystemProperties.
+        :param directory_name: Name of the directory.
+        """
+        directory_client = self.get_directory_client(file_system_name, directory_name)
+        directory_client.delete_directory()
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index 4962863668..e3af9984c6 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -67,6 +67,7 @@ dependencies:
   - azure-servicebus>=7.6.1
   - azure-synapse-spark
   - adal>=1.2.7
+  - azure-storage-file-datalake>=12.9.1
 
 integrations:
   - integration-name: Microsoft Azure Batch
@@ -124,6 +125,10 @@ integrations:
     how-to-guide:
       - /docs/apache-airflow-providers-microsoft-azure/operators/azure_synapse.rst
     tags: [azure]
+  - integration-name: Microsoft Azure Data Lake Storage Client Gen2
+    external-doc-url: https://azure.microsoft.com/en-us/products/storage/data-lake-storage/
+    logo: /integration-logos/azure/Data Lake Storage.svg
+    tags: [azure]
 
 operators:
   - integration-name: Microsoft Azure Data Lake Storage
@@ -198,6 +203,9 @@ hooks:
   - integration-name: Microsoft Azure Service Bus
     python-modules:
       - airflow.providers.microsoft.azure.hooks.asb
+  - integration-name: Microsoft Azure Data Lake Storage Client Gen2
+    python-modules:
+      - airflow.providers.microsoft.azure.hooks.data_lake
   - integration-name: Microsoft Azure Synapse
     python-modules:
       - airflow.providers.microsoft.azure.hooks.synapse
@@ -252,6 +260,8 @@ connection-types:
     connection-type: azure_service_bus
   - hook-class-name: airflow.providers.microsoft.azure.hooks.synapse.AzureSynapseHook
     connection-type: azure_synapse
+  - hook-class-name: airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook
+    connection-type: adls
 
 secrets-backends:
   - airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend
diff --git a/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst
new file mode 100644
index 0000000000..9ec4015679
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/connections/adls_v2.rst
@@ -0,0 +1,68 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:adls:
+
+Microsoft Azure Data Lake Storage Gen2 Connection
+==================================================
+
+The Microsoft Azure Data Lake Storage Gen2 connection type enables the ADLS gen2 Integrations.
+
+Authenticating to Azure Data Lake Storage Gen2
+----------------------------------------------
+
+Currently, there are two ways to connect to Azure Data Lake Storage Gen2 using Airflow.
+
+1. Use `token credentials
+   <https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=cmd#authenticate-with-token-credentials>`_
+   i.e. add specific credentials (client_id, secret, tenant) and subscription id to the Airflow connection.
+2. Use a `Connection String
+   <https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/storage>`_
+   i.e. add connection string to ``connection_string`` in the Airflow connection.
+
+Only one authorization method can be used at a time. If you need to manage multiple credentials or keys then you should
+configure multiple connections.
+
+Default Connection IDs
+----------------------
+
+All hooks and operators related to Microsoft Azure Blob Storage use ``azure_data_lake_default`` by default.
+
+Configuring the Connection
+--------------------------
+
+Login (optional)
+    Specify the login used for azure blob storage. For use with Shared Key Credential and SAS Token authentication.
+
+Password (optional)
+    Specify the password used for azure blob storage. For use with
+    Active Directory (token credential) and shared key authentication.
+
+Host (optional)
+    Specify the account url for anonymous public read, Active Directory, shared access key authentication.
+
+Extra (optional)
+    Specify the extra parameters (as json dictionary) that can be used in Azure connection.
+    The following parameters are all optional:
+
+    * ``tenant_id``: Specify the tenant to use. Needed for Active Directory (token) authentication.
+    * ``connection_string``: Connection string for use with connection string authentication.
+
+When specifying the connection in environment variable you should specify
+it using URI syntax.
+
+Note that all components of the URI should be URL-encoded.
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index f814543523..d7729e1a4b 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -452,6 +452,7 @@
       "azure-servicebus>=7.6.1",
       "azure-storage-blob>=12.14.0",
       "azure-storage-common>=2.1.0",
+      "azure-storage-file-datalake>=12.9.1",
       "azure-storage-file>=2.1.0",
       "azure-synapse-spark"
     ],
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
index f2b2d5a654..e8d378ab5c 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
@@ -21,7 +21,7 @@ import json
 from unittest import mock
 
 from airflow.models import Connection
-from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
+from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook, AzureDataLakeStorageV2Hook
 from airflow.utils import db
 from tests.test_utils.providers import get_provider_min_airflow_version
 
@@ -151,3 +151,97 @@ class TestAzureDataLakeHook:
                 "You must now remove `_ensure_prefixes` from azure utils."
                 " The functionality is now taken care of by providers manager."
             )
+
+
+class TestAzureDataLakeStorageV2Hook:
+    def setup_class(self) -> None:
+        self.conn_id: str = "adls_conn_id"
+        self.file_system_name = "test_file_system"
+        self.directory_name = "test_directory"
+        self.file_name = "test_file_name"
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_create_file_system(self, mock_conn):
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        hook.create_file_system("test_file_system")
+        expected_calls = [mock.call().create_file_system(file_system=self.file_system_name)]
+        mock_conn.assert_has_calls(expected_calls)
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.FileSystemClient")
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_get_file_system(self, mock_conn, mock_file_system):
+        mock_conn.return_value.get_file_system_client.return_value = mock_file_system
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        result = hook.get_file_system(self.file_system_name)
+        assert result == mock_file_system
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+    @mock.patch(
+        "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+    )
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_create_directory(self, mock_conn, mock_get_file_system, mock_directory_client):
+        mock_get_file_system.return_value.create_directory.return_value = mock_directory_client
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        result = hook.create_directory(self.file_system_name, self.directory_name)
+        assert result == mock_directory_client
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+    @mock.patch(
+        "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+    )
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_get_directory(self, mock_conn, mock_get_file_system, mock_directory_client):
+        mock_get_file_system.return_value.get_directory_client.return_value = mock_directory_client
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        result = hook.get_directory_client(self.file_system_name, self.directory_name)
+        assert result == mock_directory_client
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeFileClient")
+    @mock.patch(
+        "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+    )
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_create_file(self, mock_conn, mock_get_file_system, mock_file_client):
+        mock_get_file_system.return_value.create_file.return_value = mock_file_client
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        result = hook.create_file(self.file_system_name, self.file_name)
+        assert result == mock_file_client
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_delete_file_system(self, mock_conn):
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        hook.delete_file_system(self.file_system_name)
+        expected_calls = [mock.call().delete_file_system(self.file_system_name)]
+        mock_conn.assert_has_calls(expected_calls)
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.DataLakeDirectoryClient")
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_delete_directory(self, mock_conn, mock_directory_client):
+        mock_conn.return_value.get_directory_client.return_value = mock_directory_client
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        hook.delete_directory(self.file_system_name, self.directory_name)
+        expected_calls = [
+            mock.call()
+            .get_file_system_client(self.file_system_name)
+            .get_directory_client(self.directory_name)
+            .delete_directory()
+        ]
+        mock_conn.assert_has_calls(expected_calls)
+
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_list_file_system(self, mock_conn):
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        hook.list_file_system(prefix="prefix")
+        mock_conn.return_value.list_file_systems.assert_called_once_with(
+            name_starts_with="prefix", include_metadata=False
+        )
+
+    @mock.patch(
+        "airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_file_system"
+    )
+    @mock.patch("airflow.providers.microsoft.azure.hooks.data_lake.AzureDataLakeStorageV2Hook.get_conn")
+    def test_list_files_directory(self, mock_conn, mock_get_file_system):
+        hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.conn_id)
+        hook.list_files_directory(self.file_system_name, self.directory_name)
+        mock_get_file_system.return_value.get_paths.assert_called_once_with(self.directory_name)