You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:08:07 UTC

[airflow] branch master updated: add AzureDatalakeStorageDeleteOperator (#13206)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5185d81  add AzureDatalakeStorageDeleteOperator (#13206)
5185d81 is described below

commit 5185d81ff99523fe363bd5024cef9660c94214ff
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Dec 24 14:07:58 2020 +0100

    add AzureDatalakeStorageDeleteOperator (#13206)
---
 .../azure/example_dags/example_adls_delete.py      | 47 ++++++++++++++++
 .../microsoft/azure/hooks/azure_data_lake.py       | 20 +++++++
 .../microsoft/azure/operators/adls_delete.py       | 63 ++++++++++++++++++++++
 airflow/providers/microsoft/azure/provider.yaml    |  3 ++
 .../operators/adls.rst                             | 55 +++++++++++++++++++
 .../microsoft/azure/hooks/test_azure_data_lake.py  | 11 ++++
 .../microsoft/azure/operators/test_adls_delete.py  | 37 +++++++++++++
 .../azure/operators/test_adls_delete_system.py     | 48 +++++++++++++++++
 tests/test_utils/azure_system_helpers.py           | 32 +++++++++++
 9 files changed, 316 insertions(+)

diff --git a/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py b/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
new file mode 100644
index 0000000..82d9377
--- /dev/null
+++ b/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import models
+from airflow.providers.microsoft.azure.operators.adls_delete import AzureDataLakeStorageDeleteOperator
+from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalToAzureDataLakeStorageOperator
+from airflow.utils.dates import days_ago
+
+LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", 'localfile.txt')
+REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", 'remote.txt')
+
+
+with models.DAG(
+    "example_adls_delete",
+    start_date=days_ago(1),
+    schedule_interval=None,
+    tags=['example'],
+) as dag:
+
+    upload_file = LocalToAzureDataLakeStorageOperator(
+        task_id='upload_task',
+        local_path=LOCAL_FILE_PATH,
+        remote_path=REMOTE_FILE_PATH,
+    )
+    # [START howto_operator_adls_delete]
+    remove_file = AzureDataLakeStorageDeleteOperator(
+        task_id="delete_task", path=REMOTE_FILE_PATH, recursive=True
+    )
+    # [END howto_operator_adls_delete]
+
+    upload_file >> remove_file
diff --git a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
index fdc519f..8a99f54 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
@@ -28,6 +28,7 @@ from typing import Optional
 
 from azure.datalake.store import core, lib, multithread
 
+from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
 
 
@@ -189,3 +190,22 @@ class AzureDataLakeHook(BaseHook):
             return self.get_conn().glob(path)
         else:
             return self.get_conn().walk(path)
+
+    def remove(self, path: str, recursive: bool = False, ignore_not_found: bool = True) -> None:
+        """
+        Remove files in Azure Data Lake Storage
+
+        :param path: A directory or file to remove in ADLS
+        :type path: str
+        :param recursive: Whether to loop into directories in the location and remove the files
+        :type recursive: bool
+        :param ignore_not_found: Whether to raise error if file to delete is not found
+        :type ignore_not_found: bool
+        """
+        try:
+            self.get_conn().remove(path=path, recursive=recursive)
+        except FileNotFoundError:
+            if ignore_not_found:
+                self.log.info("File %s not found", path)
+            else:
+                raise AirflowException("File %s not found" % path)
diff --git a/airflow/providers/microsoft/azure/operators/adls_delete.py b/airflow/providers/microsoft/azure/operators/adls_delete.py
new file mode 100644
index 0000000..accd64f
--- /dev/null
+++ b/airflow/providers/microsoft/azure/operators/adls_delete.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AzureDataLakeStorageDeleteOperator(BaseOperator):
+    """
+    Delete files in the specified path.
+
+        .. seealso::
+            For more information on how to use this operator, take a look at the guide:
+            :ref:`howto/operator:AzureDataLakeStorageDeleteOperator`
+
+    :param path: A directory or file to remove
+    :type path: str
+    :param recursive: Whether to loop into directories in the location and remove the files
+    :type recursive: bool
+    :param ignore_not_found: Whether to raise error if file to delete is not found
+    :type ignore_not_found: bool
+    """
+
+    template_fields: Sequence[str] = ('path',)
+    ui_color = '#901dd2'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        path: str,
+        recursive: bool = False,
+        ignore_not_found: bool = True,
+        azure_data_lake_conn_id: str = 'azure_data_lake_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.path = path
+        self.recursive = recursive
+        self.ignore_not_found = ignore_not_found
+        self.azure_data_lake_conn_id = azure_data_lake_conn_id
+
+    def execute(self, context: dict) -> Any:
+        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
+
+        return hook.remove(path=self.path, recursive=self.recursive, ignore_not_found=self.ignore_not_found)
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index fa0d112..3b12144 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -44,6 +44,8 @@ integrations:
     external-doc-url: https://azure.microsoft.com/en-us/services/data-explorer/
     tags: [azure]
   - integration-name: Microsoft Azure Data Lake Storage
+    how-to-guide:
+      - /docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
     external-doc-url: https://azure.microsoft.com/en-us/services/storage/data-lake-storage/
     logo: /integration-logos/azure/Data Lake Storage.svg
     tags: [azure]
@@ -62,6 +64,7 @@ operators:
   - integration-name: Microsoft Azure Data Lake Storage
     python-modules:
       - airflow.providers.microsoft.azure.operators.adls_list
+      - airflow.providers.microsoft.azure.operators.adls_delete
   - integration-name: Microsoft Azure Data Explorer
     python-modules:
       - airflow.providers.microsoft.azure.operators.adx
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
new file mode 100644
index 0000000..7d20852
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
@@ -0,0 +1,55 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Azure DataLake Storage Operators
+=================================
+
+.. contents::
+  :depth: 1
+  :local:
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:AzureDataLakeStorageDeleteOperator:
+
+AzureDataLakeStorageDeleteOperator
+----------------------------------
+Use the
+:class:`~airflow.providers.microsoft.azure.operators.adls_delete.AzureDataLakeStorageDeleteOperator` to remove
+file(s) from Azure DataLake Storage
+
+
+Below is an example of using this operator to delete a file from ADL.
+
+.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_adls_delete]
+    :end-before: [END howto_operator_adls_delete]
+
+
+Reference
+---------
+
+For further information, look at:
+
+* `Azure Data lake Storage Documentation <https://docs.microsoft.com/en-us/azure/data-lake-store/>`__
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
index 8fa0fee..8c70749 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
@@ -132,3 +132,14 @@ class TestAzureDataLakeHook(unittest.TestCase):
         hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
         hook.list('file_path/some_folder/')
         mock_fs.return_value.walk.assert_called_once_with('file_path/some_folder/')
+
+    @mock.patch(
+        'airflow.providers.microsoft.azure.hooks.azure_data_lake.core.AzureDLFileSystem', autospec=True
+    )
+    @mock.patch('airflow.providers.microsoft.azure.hooks.azure_data_lake.lib', autospec=True)
+    def test_remove(self, mock_lib, mock_fs):
+        from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
+
+        hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+        hook.remove('filepath', True)
+        mock_fs.return_value.remove.assert_called_once_with('filepath', recursive=True)
diff --git a/tests/providers/microsoft/azure/operators/test_adls_delete.py b/tests/providers/microsoft/azure/operators/test_adls_delete.py
new file mode 100644
index 0000000..2c5f2c1
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_adls_delete.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.microsoft.azure.operators.adls_delete import AzureDataLakeStorageDeleteOperator
+
+TASK_ID = 'test-adls-list-operator'
+TEST_PATH = 'test'
+
+
+class TestAzureDataLakeStorageDeleteOperator(unittest.TestCase):
+    @mock.patch('airflow.providers.microsoft.azure.operators.adls_delete.AzureDataLakeHook')
+    def test_execute(self, mock_hook):
+
+        operator = AzureDataLakeStorageDeleteOperator(task_id=TASK_ID, path=TEST_PATH)
+
+        operator.execute(None)
+        mock_hook.return_value.remove.assert_called_once_with(
+            path=TEST_PATH, recursive=False, ignore_not_found=True
+        )
diff --git a/tests/providers/microsoft/azure/operators/test_adls_delete_system.py b/tests/providers/microsoft/azure/operators/test_adls_delete_system.py
new file mode 100644
index 0000000..7fe437a
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_adls_delete_system.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_local_to_adls import LOCAL_FILE_PATH
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_azure_data_lake_default_connection,
+)
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+DATA_LAKE_DEFAULT_KEY = 'azure_data_lake.json'
+CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, DATA_LAKE_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(DATA_LAKE_DEFAULT_KEY)
+class ADLSDeleteSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        with open(LOCAL_FILE_PATH, 'w+') as file:
+            file.writelines(['example test files'])
+
+    def tearDown(self):
+        os.remove(LOCAL_FILE_PATH)
+        super().tearDown()
+
+    @provide_azure_data_lake_default_connection(CREDENTIALS_PATH)
+    def test_run_example_adls_delete(self):
+        self.run_dag('example_adls_delete', AZURE_DAG_FOLDER)
diff --git a/tests/test_utils/azure_system_helpers.py b/tests/test_utils/azure_system_helpers.py
index c06fe32..3c5ada4 100644
--- a/tests/test_utils/azure_system_helpers.py
+++ b/tests/test_utils/azure_system_helpers.py
@@ -35,6 +35,9 @@ AZURE_DAG_FOLDER = os.path.join(
 )
 WASB_CONNECTION_ID = os.environ.get("WASB_CONNECTION_ID", "wasb_default")
 
+DATA_LAKE_CONNECTION_ID = os.environ.get("AZURE_DATA_LAKE_CONNECTION_ID", 'azure_data_lake_default')
+DATA_LAKE_CONNECTION_TYPE = os.environ.get("AZURE_DATA_LAKE_CONNECTION_TYPE", 'azure_data_lake')
+
 
 @contextmanager
 def provide_wasb_default_connection(key_file_path: str):
@@ -61,6 +64,35 @@ def provide_wasb_default_connection(key_file_path: str):
 
 
 @contextmanager
+def provide_azure_data_lake_default_connection(key_file_path: str):
+    """
+    Context manager to provide a temporary value for azure_data_lake_default connection
+    :param key_file_path: Path to file with azure_data_lake_default credentials .json file.
+    :type key_file_path: str
+    """
+    required_fields = {'login', 'password', 'extra'}
+
+    if not key_file_path.endswith(".json"):
+        raise AirflowException("Use a JSON key file.")
+    with open(key_file_path) as credentials:
+        creds = json.load(credentials)
+    missing_keys = required_fields - creds.keys()
+    if missing_keys:
+        message = f"{missing_keys} fields are missing"
+        raise AirflowException(message)
+    conn = Connection(
+        conn_id=DATA_LAKE_CONNECTION_ID,
+        conn_type=DATA_LAKE_CONNECTION_TYPE,
+        host=creds.get("host", None),
+        login=creds.get("login", None),
+        password=creds.get("password", None),
+        extra=json.dumps(creds.get('extra', None)),
+    )
+    with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
+        yield
+
+
+@contextmanager
 def provide_azure_fileshare(share_name: str, wasb_conn_id: str, file_name: str, directory: str):
     AzureSystemTest.prepare_share(
         share_name=share_name,