You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2021/12/22 09:48:08 UTC
[airflow] branch main updated: Azure: New sftp to wasb operator (#18877)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 341bf5a Azure: New sftp to wasb operator (#18877)
341bf5a is described below
commit 341bf5ab1f528a98fa2c7325113cfe425843cff1
Author: Guilherme Da Silva Gonçalves <gu...@gmail.com>
AuthorDate: Wed Dec 22 06:47:35 2021 -0300
Azure: New sftp to wasb operator (#18877)
* Azure: New sftp to wasb operator
Co-authored-by: Guilherme da Silva Goncalves <gu...@bancointer.com.br>
Co-authored-by: Josh Fell <48...@users.noreply.github.com>
---
CONTRIBUTING.rst | 2 +-
airflow/providers/dependencies.json | 3 +-
.../azure/example_dags/example_sftp_to_wasb.py | 72 ++++++
airflow/providers/microsoft/azure/provider.yaml | 4 +
.../microsoft/azure/transfers/sftp_to_wasb.py | 199 ++++++++++++++++
.../operators/sftp_to_wasb.rst | 61 +++++
.../microsoft/azure/transfers/test_sftp_to_wasb.py | 256 +++++++++++++++++++++
.../azure/transfers/test_sftp_to_wasb_system.py | 57 +++++
tests/test_utils/sftp_system_helpers.py | 51 ++++
9 files changed, 703 insertions(+), 2 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 3ebb0fc..bb844eb 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -683,7 +683,7 @@ dingding http
discord http
google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino
hashicorp google
-microsoft.azure google,oracle
+microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
salesforce tableau
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index ec082c1..3c51146 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -59,7 +59,8 @@
],
"microsoft.azure": [
"google",
- "oracle"
+ "oracle",
+ "sftp"
],
"mysql": [
"amazon",
diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
new file mode 100644
index 0000000..d70ca31
--- /dev/null
+++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.providers.sftp.operators.sftp import SFTPOperator
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp")
+LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp")
+SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt")
+FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME)
+SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME)
+
+
+@task
+def delete_sftp_file():
+ """Delete a file at SFTP SERVER"""
+ SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH)
+
+
+with DAG(
+ "example_sftp_to_wasb",
+ schedule_interval=None,
+ catchup=False,
+ start_date=datetime(2021, 1, 1), # Override to match your needs
+) as dag:
+ transfer_files_to_sftp_step = SFTPOperator(
+ task_id="transfer_files_from_local_to_sftp",
+ local_filepath=FILE_COMPLETE_PATH,
+ remote_filepath=SFTP_FILE_COMPLETE_PATH,
+ )
+
+ # [START how_to_sftp_to_wasb]
+ transfer_files_to_azure = SFTPToWasbOperator(
+ task_id="transfer_files_from_sftp_to_wasb",
+ # SFTP args
+ sftp_source_path=SFTP_SRC_PATH,
+ # AZURE args
+ container_name=AZURE_CONTAINER_NAME,
+ blob_prefix=BLOB_PREFIX,
+ )
+ # [END how_to_sftp_to_wasb]
+
+ delete_blob_file_step = WasbDeleteBlobOperator(
+ task_id="delete_blob_files",
+ container_name=AZURE_CONTAINER_NAME,
+ blob_name=BLOB_PREFIX + SAMPLE_FILENAME,
+ )
+
+ transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file()
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index 8c67bbf..97e9890 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -179,6 +179,10 @@ transfers:
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
+ - source-integration-name: SSH File Transfer Protocol (SFTP)
+ target-integration-name: Microsoft Azure Blob Storage
+ how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
+ python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb
hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+
- airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook
diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
new file mode 100644
index 0000000..04e7b11
--- /dev/null
+++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+import sys
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Dict, List, Optional, Tuple
+
+if sys.version_info >= (3, 8):
+ from functools import cached_property
+else:
+ from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+ """
+ Transfer files to Azure Blob Storage from SFTP server.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:SFTPToWasbOperator`
+
+ :param sftp_source_path: The sftp remote path. This is the specified file path
+ for downloading the single file or multiple files from the SFTP server.
+ You can use only one wildcard within your path. The wildcard can appear
+ inside the path or at the end of the path.
+ :type sftp_source_path: str
+ :param container_name: Name of the container.
+ :type container_name: str
+ :param blob_prefix: Prefix to name a blob.
+ :type blob_prefix: str
+ :param sftp_conn_id: The sftp connection id. The name or identifier for
+ establishing a connection to the SFTP server.
+ :type sftp_conn_id: str
+ :param wasb_conn_id: Reference to the wasb connection.
+ :type wasb_conn_id: str
+ :param load_options: Optional keyword arguments that
+ ``WasbHook.load_file()`` takes.
+ :type load_options: dict
+ :param move_object: When move object is True, the object is moved instead
+ of copied to the new location. This is the equivalent of a mv command
+ as opposed to a cp command.
+ :param wasb_overwrite_object: Whether the blob to be uploaded
+ should overwrite the current data.
+ When wasb_overwrite_object is True, it will overwrite the existing data.
+ If set to False, the operation might fail with
+ ResourceExistsError in case a blob object already exists.
+ :type move_object: bool
+ """
+
+ template_fields = ("sftp_source_path", "container_name", "blob_prefix")
+
+ def __init__(
+ self,
+ *,
+ sftp_source_path: str,
+ container_name: str,
+ blob_prefix: str = "",
+ sftp_conn_id: str = "sftp_default",
+ wasb_conn_id: str = 'wasb_default',
+ load_options: Optional[Dict] = None,
+ move_object: bool = False,
+ wasb_overwrite_object: bool = False,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.sftp_source_path = sftp_source_path
+ self.blob_prefix = blob_prefix
+ self.sftp_conn_id = sftp_conn_id
+ self.wasb_conn_id = wasb_conn_id
+ self.container_name = container_name
+ self.wasb_conn_id = wasb_conn_id
+ self.load_options = load_options or {"overwrite": wasb_overwrite_object}
+ self.move_object = move_object
+
+ def dry_run(self) -> None:
+ super().dry_run()
+ sftp_files: List[SftpFile] = self.get_sftp_files_map()
+ for file in sftp_files:
+ self.log.info(
+ 'Process will upload file from (SFTP) %s to wasb://%s as %s',
+ file.sftp_file_path,
+ self.container_name,
+ file.blob_name,
+ )
+ if self.move_object:
+ self.log.info("Executing delete of %s", file)
+
+ def execute(self, context: Dict) -> None:
+ """Upload a file from SFTP to Azure Blob Storage."""
+ sftp_files: List[SftpFile] = self.get_sftp_files_map()
+ uploaded_files = self.copy_files_to_wasb(sftp_files)
+ if self.move_object:
+ self.delete_files(uploaded_files)
+
+ def get_sftp_files_map(self) -> List[SftpFile]:
+ """Get SFTP files from the source path, it may use a WILDCARD to this end."""
+ sftp_files = []
+
+ sftp_complete_path, prefix, delimiter = self.get_tree_behavior()
+
+ found_files, _, _ = self.sftp_hook.get_tree_map(
+ sftp_complete_path, prefix=prefix, delimiter=delimiter
+ )
+
+ self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path)
+
+ for file in found_files:
+ future_blob_name = self.get_full_path_blob(file)
+ sftp_files.append(SftpFile(file, future_blob_name))
+
+ return sftp_files
+
+ def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]:
+ """Extracts from source path the tree behavior to interact with the remote folder"""
+ self.check_wildcards_limit()
+
+ if self.source_path_contains_wildcard:
+
+ prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1)
+
+ sftp_complete_path = os.path.dirname(prefix)
+
+ return sftp_complete_path, prefix, delimiter
+
+ return self.sftp_source_path, None, None
+
+ def check_wildcards_limit(self) -> None:
+ """Check if there are multiple wildcards used in the SFTP source path."""
+ total_wildcards = self.sftp_source_path.count(WILDCARD)
+ if total_wildcards > 1:
+ raise AirflowException(
+ "Only one wildcard '*' is allowed in sftp_source_path parameter. "
+ f"Found {total_wildcards} in {self.sftp_source_path}."
+ )
+
+ @property
+ def source_path_contains_wildcard(self) -> bool:
+ """Checks if the SFTP source path contains a wildcard."""
+ return WILDCARD in self.sftp_source_path
+
+ @cached_property
+ def sftp_hook(self) -> SFTPHook:
+ """Property of sftp hook to be re-used."""
+ return SFTPHook(self.sftp_conn_id)
+
+ def get_full_path_blob(self, file: str) -> str:
+ """Get a blob name based on the previous name and a blob_prefix variable"""
+ return self.blob_prefix + os.path.basename(file)
+
+ def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]:
+ """Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name."""
+ uploaded_files = []
+ wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+ for file in sftp_files:
+ with NamedTemporaryFile("w") as tmp:
+ self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name)
+ self.log.info(
+ 'Uploading %s to wasb://%s as %s',
+ file.sftp_file_path,
+ self.container_name,
+ file.blob_name,
+ )
+ wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options)
+
+ uploaded_files.append(file.sftp_file_path)
+
+ return uploaded_files
+
+ def delete_files(self, uploaded_files: List[str]) -> None:
+ """Delete files at SFTP which have been moved to Azure Blob Storage."""
+ for sftp_file_path in uploaded_files:
+ self.log.info("Executing delete of %s", sftp_file_path)
+ self.sftp_hook.delete_file(sftp_file_path)
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
new file mode 100644
index 0000000..b628681
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
@@ -0,0 +1,61 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Azure Blob Storage Transfer Operator
+====================================
+The Blob service stores text and binary data as objects in the cloud.
+The Blob service offers the following three resources: the storage account, containers, and blobs.
+Within your storage account, containers provide a way to organize sets of blobs.
+For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.
+
+Before you begin
+^^^^^^^^^^^^^^^^
+Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
+Please follow Azure
+`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
+to do it.
+
+See following example.
+Set values for these fields:
+
+.. code-block::
+
+ SFTP Conn Id: sftp_default
+ WASB Conn Id: wasb_default
+
+.. contents::
+ :depth: 1
+ :local:
+
+.. _howto/operator:SFTPToWasbOperator:
+
+Transfer Data from SFTP Source Path to Blob Storage
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage
+
+To get information about jobs within a Azure Blob Storage use:
+:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator`
+Example usage:
+
+.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_sftp_to_wasb]
+ :end-before: [END how_to_sftp_to_wasb]
diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py
new file mode 100644
index 0000000..1c34835
--- /dev/null
+++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest import mock
+
+from airflow import AirflowException
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator
+
+TASK_ID = "test-gcs-to-sftp-operator"
+WASB_CONN_ID = "wasb_default"
+SFTP_CONN_ID = "ssh_default"
+
+CONTAINER_NAME = "test-container"
+WILDCARD_PATH = "main_dir/*"
+WILDCARD_FILE_NAME = "main_dir/test_object*.json"
+SOURCE_PATH_NO_WILDCARD = "main_dir/"
+SOURCE_OBJECT_MULTIPLE_WILDCARDS = "main_dir/csv/*/test_*.csv"
+BLOB_PREFIX = "sponge-bob"
+EXPECTED_BLOB_NAME = "test_object3.json"
+EXPECTED_FILES = [SOURCE_PATH_NO_WILDCARD + EXPECTED_BLOB_NAME]
+
+
+class TestSFTPToWasbOperator(unittest.TestCase):
+ def test_init(self):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ blob_prefix=BLOB_PREFIX,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ assert operator.sftp_source_path == SOURCE_PATH_NO_WILDCARD
+ assert operator.sftp_conn_id == SFTP_CONN_ID
+ assert operator.container_name == CONTAINER_NAME
+ assert operator.wasb_conn_id == WASB_CONN_ID
+ assert operator.blob_prefix == BLOB_PREFIX
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook', autospec=True)
+ def test_execute_more_than_one_wildcard_exception(self, mock_hook):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ blob_prefix=BLOB_PREFIX,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ with self.assertRaises(AirflowException) as cm:
+ operator.check_wildcards_limit()
+
+ err = cm.exception
+ assert "Only one wildcard '*' is allowed" in str(err)
+
+ def test_get_sftp_tree_behavior(self):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=WILDCARD_PATH,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ sftp_complete_path, prefix, delimiter = operator.get_tree_behavior()
+
+ assert sftp_complete_path == 'main_dir', "not matched at expected complete path"
+ assert prefix == 'main_dir/', "Prefix must be EQUAL TO wildcard"
+ assert delimiter == "", "Delimiter must be empty"
+
+ def test_get_sftp_tree_behavior_without_wildcard(self):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ sftp_complete_path, prefix, delimiter = operator.get_tree_behavior()
+
+ assert sftp_complete_path == 'main_dir/', "not matched at expected complete path"
+ assert prefix is None, "Prefix must be NONE when no wildcard"
+ assert delimiter is None, "Delimiter must be none"
+
+ def test_source_path_contains_wildcard(self):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=WILDCARD_PATH,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ output = operator.source_path_contains_wildcard
+ assert output is True, "This path contains a wildpath"
+
+ def test_source_path_not_contains_wildcard(self):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+ output = operator.source_path_contains_wildcard
+ assert output is False, "This path does not contains a wildpath"
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+ def test_get_sftp_files_map_no_wildcard(self, sftp_hook, mock_hook):
+ sftp_hook.return_value.get_tree_map.return_value = [
+ EXPECTED_FILES,
+ [],
+ [],
+ ]
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=True,
+ )
+ files = operator.get_sftp_files_map()
+
+ assert len(files) == 1, "no matched at expected found files"
+ assert files[0].blob_name == EXPECTED_BLOB_NAME, "expected blob name not matched"
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+ def test_copy_files_to_wasb(self, sftp_hook, mock_hook):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=True,
+ )
+
+ sftp_files = [SftpFile(EXPECTED_FILES[0], EXPECTED_BLOB_NAME)]
+ files = operator.copy_files_to_wasb(sftp_files)
+
+ operator.sftp_hook.retrieve_file.assert_has_calls([mock.call("main_dir/test_object3.json", mock.ANY)])
+
+ mock_hook.return_value.load_file.assert_called_once_with(
+ mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME, overwrite=False
+ )
+
+ assert len(files) == 1, "no matched at expected uploaded files"
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+ def test_delete_files(self, sftp_hook):
+ sftp_mock = sftp_hook.return_value
+
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=True,
+ )
+
+ sftp_file_paths = EXPECTED_FILES
+ operator.delete_files(sftp_file_paths)
+
+ sftp_mock.delete_file.assert_has_calls([mock.call(EXPECTED_FILES[0])])
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+ def test_execute(self, sftp_hook, mock_hook):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=WILDCARD_FILE_NAME,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=False,
+ )
+
+ sftp_hook.return_value.get_tree_map.return_value = [
+ ["main_dir/test_object.json"],
+ [],
+ [],
+ ]
+
+ operator.execute(None)
+
+ sftp_hook.return_value.get_tree_map.assert_called_with(
+ "main_dir", prefix="main_dir/test_object", delimiter=".json"
+ )
+
+ sftp_hook.return_value.retrieve_file.assert_has_calls(
+ [mock.call("main_dir/test_object.json", mock.ANY)]
+ )
+
+ mock_hook.return_value.load_file.assert_called_once_with(
+ mock.ANY, CONTAINER_NAME, "test_object.json", overwrite=False
+ )
+
+ sftp_hook.return_value.delete_file.assert_not_called()
+
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+ @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+ def test_execute_moved_files(self, sftp_hook, mock_hook):
+ operator = SFTPToWasbOperator(
+ task_id=TASK_ID,
+ sftp_source_path=WILDCARD_FILE_NAME,
+ sftp_conn_id=SFTP_CONN_ID,
+ container_name=CONTAINER_NAME,
+ wasb_conn_id=WASB_CONN_ID,
+ move_object=True,
+ blob_prefix=BLOB_PREFIX,
+ )
+
+ sftp_hook.return_value.get_tree_map.return_value = [
+ ["main_dir/test_object.json"],
+ [],
+ [],
+ ]
+
+ operator.execute(None)
+
+ sftp_hook.return_value.get_tree_map.assert_called_with(
+ "main_dir", prefix="main_dir/test_object", delimiter=".json"
+ )
+
+ sftp_hook.return_value.retrieve_file.assert_has_calls(
+ [mock.call("main_dir/test_object.json", mock.ANY)]
+ )
+
+ mock_hook.return_value.load_file.assert_called_once_with(
+ mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json", overwrite=False
+ )
+ assert sftp_hook.return_value.delete_file.called is True, "File must be moved"
diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
new file mode 100644
index 0000000..600f84f
--- /dev/null
+++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+ FILE_COMPLETE_PATH,
+ LOCAL_FILE_PATH,
+ SAMPLE_FILENAME,
+)
+from tests.test_utils.azure_system_helpers import (
+ AZURE_DAG_FOLDER,
+ AzureSystemTest,
+ provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+ def setUp(self):
+ super().setUp()
+ self.create_dummy_file(SAMPLE_FILENAME, LOCAL_FILE_PATH)
+
+ def tearDown(self):
+ os.remove(FILE_COMPLETE_PATH)
+ super().tearDown()
+
+ @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+ @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+ def test_run_example_file_to_wasb(self):
+ self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
diff --git a/tests/test_utils/sftp_system_helpers.py b/tests/test_utils/sftp_system_helpers.py
new file mode 100644
index 0000000..accf3c9
--- /dev/null
+++ b/tests/test_utils/sftp_system_helpers.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+from contextlib import contextmanager
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.utils.process_utils import patch_environ
+
+SFTP_CONNECTION_ID = os.environ.get("SFTP_CONNECTION_ID", "sftp_default")
+
+
+@contextmanager
+def provide_sftp_default_connection(key_file_path: str):
+ """
+ Context manager to provide a temporary value for sftp_default connection
+
+ :param key_file_path: Path to file with sftp_default credentials .json file.
+ :type key_file_path: str
+ """
+ if not key_file_path.endswith(".json"):
+ raise AirflowException("Use a JSON key file.")
+ with open(key_file_path) as credentials:
+ creds = json.load(credentials)
+ conn = Connection(
+ conn_id=SFTP_CONNECTION_ID,
+ conn_type="ssh",
+ port=creds.get("port", None),
+ host=creds.get("host", None),
+ login=creds.get("login", None),
+ password=creds.get("password", None),
+ extra=json.dumps(creds.get('extra', None)),
+ )
+ with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
+ yield