You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:08:07 UTC
[airflow] branch master updated: add
AzureDatalakeStorageDeleteOperator (#13206)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 5185d81 add AzureDatalakeStorageDeleteOperator (#13206)
5185d81 is described below
commit 5185d81ff99523fe363bd5024cef9660c94214ff
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Dec 24 14:07:58 2020 +0100
add AzureDatalakeStorageDeleteOperator (#13206)
---
.../azure/example_dags/example_adls_delete.py | 47 ++++++++++++++++
.../microsoft/azure/hooks/azure_data_lake.py | 20 +++++++
.../microsoft/azure/operators/adls_delete.py | 63 ++++++++++++++++++++++
airflow/providers/microsoft/azure/provider.yaml | 3 ++
.../operators/adls.rst | 55 +++++++++++++++++++
.../microsoft/azure/hooks/test_azure_data_lake.py | 11 ++++
.../microsoft/azure/operators/test_adls_delete.py | 37 +++++++++++++
.../azure/operators/test_adls_delete_system.py | 48 +++++++++++++++++
tests/test_utils/azure_system_helpers.py | 32 +++++++++++
9 files changed, 316 insertions(+)
diff --git a/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py b/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
new file mode 100644
index 0000000..82d9377
--- /dev/null
+++ b/airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from airflow import models
+from airflow.providers.microsoft.azure.operators.adls_delete import AzureDataLakeStorageDeleteOperator
+from airflow.providers.microsoft.azure.transfers.local_to_adls import LocalToAzureDataLakeStorageOperator
+from airflow.utils.dates import days_ago
+
+LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", 'localfile.txt')
+REMOTE_FILE_PATH = os.environ.get("REMOTE_LOCAL_PATH", 'remote.txt')
+
+
+with models.DAG(
+ "example_adls_delete",
+ start_date=days_ago(1),
+ schedule_interval=None,
+ tags=['example'],
+) as dag:
+
+ upload_file = LocalToAzureDataLakeStorageOperator(
+ task_id='upload_task',
+ local_path=LOCAL_FILE_PATH,
+ remote_path=REMOTE_FILE_PATH,
+ )
+ # [START howto_operator_adls_delete]
+ remove_file = AzureDataLakeStorageDeleteOperator(
+ task_id="delete_task", path=REMOTE_FILE_PATH, recursive=True
+ )
+ # [END howto_operator_adls_delete]
+
+ upload_file >> remove_file
diff --git a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
index fdc519f..8a99f54 100644
--- a/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
+++ b/airflow/providers/microsoft/azure/hooks/azure_data_lake.py
@@ -28,6 +28,7 @@ from typing import Optional
from azure.datalake.store import core, lib, multithread
+from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
@@ -189,3 +190,22 @@ class AzureDataLakeHook(BaseHook):
return self.get_conn().glob(path)
else:
return self.get_conn().walk(path)
+
+ def remove(self, path: str, recursive: bool = False, ignore_not_found: bool = True) -> None:
+ """
+ Remove files in Azure Data Lake Storage
+
+ :param path: A directory or file to remove in ADLS
+ :type path: str
+ :param recursive: Whether to loop into directories in the location and remove the files
+ :type recursive: bool
+ :param ignore_not_found: Whether to raise error if file to delete is not found
+ :type ignore_not_found: bool
+ """
+ try:
+ self.get_conn().remove(path=path, recursive=recursive)
+ except FileNotFoundError:
+ if ignore_not_found:
+ self.log.info("File %s not found", path)
+ else:
+ raise AirflowException("File %s not found" % path)
diff --git a/airflow/providers/microsoft/azure/operators/adls_delete.py b/airflow/providers/microsoft/azure/operators/adls_delete.py
new file mode 100644
index 0000000..accd64f
--- /dev/null
+++ b/airflow/providers/microsoft/azure/operators/adls_delete.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Sequence
+
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AzureDataLakeStorageDeleteOperator(BaseOperator):
+ """
+ Delete files in the specified path.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:AzureDataLakeStorageDeleteOperator`
+
+ :param path: A directory or file to remove
+ :type path: str
+ :param recursive: Whether to loop into directories in the location and remove the files
+ :type recursive: bool
+ :param ignore_not_found: Whether to raise error if file to delete is not found
+ :type ignore_not_found: bool
+ """
+
+ template_fields: Sequence[str] = ('path',)
+ ui_color = '#901dd2'
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ path: str,
+ recursive: bool = False,
+ ignore_not_found: bool = True,
+ azure_data_lake_conn_id: str = 'azure_data_lake_default',
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.path = path
+ self.recursive = recursive
+ self.ignore_not_found = ignore_not_found
+ self.azure_data_lake_conn_id = azure_data_lake_conn_id
+
+ def execute(self, context: dict) -> Any:
+ hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
+
+ return hook.remove(path=self.path, recursive=self.recursive, ignore_not_found=self.ignore_not_found)
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index fa0d112..3b12144 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -44,6 +44,8 @@ integrations:
external-doc-url: https://azure.microsoft.com/en-us/services/data-explorer/
tags: [azure]
- integration-name: Microsoft Azure Data Lake Storage
+ how-to-guide:
+ - /docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
external-doc-url: https://azure.microsoft.com/en-us/services/storage/data-lake-storage/
logo: /integration-logos/azure/Data Lake Storage.svg
tags: [azure]
@@ -62,6 +64,7 @@ operators:
- integration-name: Microsoft Azure Data Lake Storage
python-modules:
- airflow.providers.microsoft.azure.operators.adls_list
+ - airflow.providers.microsoft.azure.operators.adls_delete
- integration-name: Microsoft Azure Data Explorer
python-modules:
- airflow.providers.microsoft.azure.operators.adx
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
new file mode 100644
index 0000000..7d20852
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/adls.rst
@@ -0,0 +1,55 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Azure DataLake Storage Operators
+=================================
+
+.. contents::
+ :depth: 1
+ :local:
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:AzureDataLakeStorageDeleteOperator:
+
+AzureDataLakeStorageDeleteOperator
+----------------------------------
+Use the
+:class:`~airflow.providers.microsoft.azure.operators.adls_delete.AzureDataLakeStorageDeleteOperator` to remove
+file(s) from Azure DataLake Storage
+
+
+Below is an example of using this operator to delete a file from ADL.
+
+.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_adls_delete.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_operator_adls_delete]
+ :end-before: [END howto_operator_adls_delete]
+
+
+Reference
+---------
+
+For further information, look at:
+
+* `Azure Data lake Storage Documentation <https://docs.microsoft.com/en-us/azure/data-lake-store/>`__
diff --git a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
index 8fa0fee..8c70749 100644
--- a/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
+++ b/tests/providers/microsoft/azure/hooks/test_azure_data_lake.py
@@ -132,3 +132,14 @@ class TestAzureDataLakeHook(unittest.TestCase):
hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
hook.list('file_path/some_folder/')
mock_fs.return_value.walk.assert_called_once_with('file_path/some_folder/')
+
+ @mock.patch(
+ 'airflow.providers.microsoft.azure.hooks.azure_data_lake.core.AzureDLFileSystem', autospec=True
+ )
+ @mock.patch('airflow.providers.microsoft.azure.hooks.azure_data_lake.lib', autospec=True)
+ def test_remove(self, mock_lib, mock_fs):
+ from airflow.providers.microsoft.azure.hooks.azure_data_lake import AzureDataLakeHook
+
+ hook = AzureDataLakeHook(azure_data_lake_conn_id='adl_test_key')
+ hook.remove('filepath', True)
+ mock_fs.return_value.remove.assert_called_once_with('filepath', recursive=True)
diff --git a/tests/providers/microsoft/azure/operators/test_adls_delete.py b/tests/providers/microsoft/azure/operators/test_adls_delete.py
new file mode 100644
index 0000000..2c5f2c1
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_adls_delete.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+from airflow.providers.microsoft.azure.operators.adls_delete import AzureDataLakeStorageDeleteOperator
+
+TASK_ID = 'test-adls-list-operator'
+TEST_PATH = 'test'
+
+
+class TestAzureDataLakeStorageDeleteOperator(unittest.TestCase):
+ @mock.patch('airflow.providers.microsoft.azure.operators.adls_delete.AzureDataLakeHook')
+ def test_execute(self, mock_hook):
+
+ operator = AzureDataLakeStorageDeleteOperator(task_id=TASK_ID, path=TEST_PATH)
+
+ operator.execute(None)
+ mock_hook.return_value.remove.assert_called_once_with(
+ path=TEST_PATH, recursive=False, ignore_not_found=True
+ )
diff --git a/tests/providers/microsoft/azure/operators/test_adls_delete_system.py b/tests/providers/microsoft/azure/operators/test_adls_delete_system.py
new file mode 100644
index 0000000..7fe437a
--- /dev/null
+++ b/tests/providers/microsoft/azure/operators/test_adls_delete_system.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_local_to_adls import LOCAL_FILE_PATH
+from tests.test_utils.azure_system_helpers import (
+ AZURE_DAG_FOLDER,
+ AzureSystemTest,
+ provide_azure_data_lake_default_connection,
+)
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+DATA_LAKE_DEFAULT_KEY = 'azure_data_lake.json'
+CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, DATA_LAKE_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(DATA_LAKE_DEFAULT_KEY)
+class ADLSDeleteSystem(AzureSystemTest):
+ def setUp(self):
+ super().setUp()
+ with open(LOCAL_FILE_PATH, 'w+') as file:
+ file.writelines(['example test files'])
+
+ def tearDown(self):
+ os.remove(LOCAL_FILE_PATH)
+ super().tearDown()
+
+ @provide_azure_data_lake_default_connection(CREDENTIALS_PATH)
+ def test_run_example_adls_delete(self):
+ self.run_dag('example_adls_delete', AZURE_DAG_FOLDER)
diff --git a/tests/test_utils/azure_system_helpers.py b/tests/test_utils/azure_system_helpers.py
index c06fe32..3c5ada4 100644
--- a/tests/test_utils/azure_system_helpers.py
+++ b/tests/test_utils/azure_system_helpers.py
@@ -35,6 +35,9 @@ AZURE_DAG_FOLDER = os.path.join(
)
WASB_CONNECTION_ID = os.environ.get("WASB_CONNECTION_ID", "wasb_default")
+DATA_LAKE_CONNECTION_ID = os.environ.get("AZURE_DATA_LAKE_CONNECTION_ID", 'azure_data_lake_default')
+DATA_LAKE_CONNECTION_TYPE = os.environ.get("AZURE_DATA_LAKE_CONNECTION_TYPE", 'azure_data_lake')
+
@contextmanager
def provide_wasb_default_connection(key_file_path: str):
@@ -61,6 +64,35 @@ def provide_wasb_default_connection(key_file_path: str):
@contextmanager
+def provide_azure_data_lake_default_connection(key_file_path: str):
+ """
+ Context manager to provide a temporary value for azure_data_lake_default connection
+ :param key_file_path: Path to file with azure_data_lake_default credentials .json file.
+ :type key_file_path: str
+ """
+ required_fields = {'login', 'password', 'extra'}
+
+ if not key_file_path.endswith(".json"):
+ raise AirflowException("Use a JSON key file.")
+ with open(key_file_path) as credentials:
+ creds = json.load(credentials)
+ missing_keys = required_fields - creds.keys()
+ if missing_keys:
+ message = f"{missing_keys} fields are missing"
+ raise AirflowException(message)
+ conn = Connection(
+ conn_id=DATA_LAKE_CONNECTION_ID,
+ conn_type=DATA_LAKE_CONNECTION_TYPE,
+ host=creds.get("host", None),
+ login=creds.get("login", None),
+ password=creds.get("password", None),
+ extra=json.dumps(creds.get('extra', None)),
+ )
+ with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
+ yield
+
+
+@contextmanager
def provide_azure_fileshare(share_name: str, wasb_conn_id: str, file_name: str, directory: str):
AzureSystemTest.prepare_share(
share_name=share_name,