You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2021/12/22 09:48:08 UTC

[airflow] branch main updated: Azure: New sftp to wasb operator (#18877)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 341bf5a  Azure: New sftp to wasb operator (#18877)
341bf5a is described below

commit 341bf5ab1f528a98fa2c7325113cfe425843cff1
Author: Guilherme Da Silva Gonçalves <gu...@gmail.com>
AuthorDate: Wed Dec 22 06:47:35 2021 -0300

    Azure: New sftp to wasb operator (#18877)
    
    * Azure: New sftp to wasb operator
    
    Co-authored-by: Guilherme da Silva Goncalves <gu...@bancointer.com.br>
    Co-authored-by: Josh Fell <48...@users.noreply.github.com>
---
 CONTRIBUTING.rst                                   |   2 +-
 airflow/providers/dependencies.json                |   3 +-
 .../azure/example_dags/example_sftp_to_wasb.py     |  72 ++++++
 airflow/providers/microsoft/azure/provider.yaml    |   4 +
 .../microsoft/azure/transfers/sftp_to_wasb.py      | 199 ++++++++++++++++
 .../operators/sftp_to_wasb.rst                     |  61 +++++
 .../microsoft/azure/transfers/test_sftp_to_wasb.py | 256 +++++++++++++++++++++
 .../azure/transfers/test_sftp_to_wasb_system.py    |  57 +++++
 tests/test_utils/sftp_system_helpers.py            |  51 ++++
 9 files changed, 703 insertions(+), 2 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 3ebb0fc..bb844eb 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -683,7 +683,7 @@ dingding                   http
 discord                    http
 google                     amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino
 hashicorp                  google
-microsoft.azure            google,oracle
+microsoft.azure            google,oracle,sftp
 mysql                      amazon,presto,trino,vertica
 postgres                   amazon
 salesforce                 tableau
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index ec082c1..3c51146 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -59,7 +59,8 @@
   ],
   "microsoft.azure": [
     "google",
-    "oracle"
+    "oracle",
+    "sftp"
   ],
   "mysql": [
     "amazon",
diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
new file mode 100644
index 0000000..d70ca31
--- /dev/null
+++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.providers.sftp.operators.sftp import SFTPOperator
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp")
+LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp")
+SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt")
+FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME)
+SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME)
+
+
+@task
+def delete_sftp_file():
+    """Delete a file at SFTP SERVER"""
+    SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH)
+
+
+with DAG(
+    "example_sftp_to_wasb",
+    schedule_interval=None,
+    catchup=False,
+    start_date=datetime(2021, 1, 1),  # Override to match your needs
+) as dag:
+    transfer_files_to_sftp_step = SFTPOperator(
+        task_id="transfer_files_from_local_to_sftp",
+        local_filepath=FILE_COMPLETE_PATH,
+        remote_filepath=SFTP_FILE_COMPLETE_PATH,
+    )
+
+    # [START how_to_sftp_to_wasb]
+    transfer_files_to_azure = SFTPToWasbOperator(
+        task_id="transfer_files_from_sftp_to_wasb",
+        # SFTP args
+        sftp_source_path=SFTP_SRC_PATH,
+        # AZURE args
+        container_name=AZURE_CONTAINER_NAME,
+        blob_prefix=BLOB_PREFIX,
+    )
+    # [END how_to_sftp_to_wasb]
+
+    delete_blob_file_step = WasbDeleteBlobOperator(
+        task_id="delete_blob_files",
+        container_name=AZURE_CONTAINER_NAME,
+        blob_name=BLOB_PREFIX + SAMPLE_FILENAME,
+    )
+
+    transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file()
diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml
index 8c67bbf..97e9890 100644
--- a/airflow/providers/microsoft/azure/provider.yaml
+++ b/airflow/providers/microsoft/azure/provider.yaml
@@ -179,6 +179,10 @@ transfers:
     target-integration-name: Google Cloud Storage (GCS)
     how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst
     python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
+  - source-integration-name: SSH File Transfer Protocol (SFTP)
+    target-integration-name: Microsoft Azure Blob Storage
+    how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
+    python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb
 
 hook-class-names:  # deprecated - to be removed after providers add dependency on Airflow 2.2.0+
   - airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook
diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
new file mode 100644
index 0000000..04e7b11
--- /dev/null
+++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+import sys
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Dict, List, Optional, Tuple
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SFTPToWasbOperator`
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        ``WasbHook.load_file()`` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :param wasb_overwrite_object: Whether the blob to be uploaded
+        should overwrite the current data.
+        When wasb_overwrite_object is True, it will overwrite the existing data.
+        If set to False, the operation might fail with
+        ResourceExistsError in case a blob object already exists.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_prefix")
+
+    def __init__(
+        self,
+        *,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[Dict] = None,
+        move_object: bool = False,
+        wasb_overwrite_object: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.sftp_source_path = sftp_source_path
+        self.blob_prefix = blob_prefix
+        self.sftp_conn_id = sftp_conn_id
+        self.wasb_conn_id = wasb_conn_id
+        self.container_name = container_name
+        self.wasb_conn_id = wasb_conn_id
+        self.load_options = load_options or {"overwrite": wasb_overwrite_object}
+        self.move_object = move_object
+
+    def dry_run(self) -> None:
+        super().dry_run()
+        sftp_files: List[SftpFile] = self.get_sftp_files_map()
+        for file in sftp_files:
+            self.log.info(
+                'Process will upload file from (SFTP) %s to wasb://%s as %s',
+                file.sftp_file_path,
+                self.container_name,
+                file.blob_name,
+            )
+            if self.move_object:
+                self.log.info("Executing delete of %s", file)
+
+    def execute(self, context: Dict) -> None:
+        """Upload a file from SFTP to Azure Blob Storage."""
+        sftp_files: List[SftpFile] = self.get_sftp_files_map()
+        uploaded_files = self.copy_files_to_wasb(sftp_files)
+        if self.move_object:
+            self.delete_files(uploaded_files)
+
+    def get_sftp_files_map(self) -> List[SftpFile]:
+        """Get SFTP files from the source path, it may use a WILDCARD to this end."""
+        sftp_files = []
+
+        sftp_complete_path, prefix, delimiter = self.get_tree_behavior()
+
+        found_files, _, _ = self.sftp_hook.get_tree_map(
+            sftp_complete_path, prefix=prefix, delimiter=delimiter
+        )
+
+        self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path)
+
+        for file in found_files:
+            future_blob_name = self.get_full_path_blob(file)
+            sftp_files.append(SftpFile(file, future_blob_name))
+
+        return sftp_files
+
+    def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]:
+        """Extracts from source path the tree behavior to interact with the remote folder"""
+        self.check_wildcards_limit()
+
+        if self.source_path_contains_wildcard:
+
+            prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1)
+
+            sftp_complete_path = os.path.dirname(prefix)
+
+            return sftp_complete_path, prefix, delimiter
+
+        return self.sftp_source_path, None, None
+
+    def check_wildcards_limit(self) -> None:
+        """Check if there are multiple wildcards used in the SFTP source path."""
+        total_wildcards = self.sftp_source_path.count(WILDCARD)
+        if total_wildcards > 1:
+            raise AirflowException(
+                "Only one wildcard '*' is allowed in sftp_source_path parameter. "
+                f"Found {total_wildcards} in {self.sftp_source_path}."
+            )
+
+    @property
+    def source_path_contains_wildcard(self) -> bool:
+        """Checks if the SFTP source path contains a wildcard."""
+        return WILDCARD in self.sftp_source_path
+
+    @cached_property
+    def sftp_hook(self) -> SFTPHook:
+        """Property of sftp hook to be re-used."""
+        return SFTPHook(self.sftp_conn_id)
+
+    def get_full_path_blob(self, file: str) -> str:
+        """Get a blob name based on the previous name and a blob_prefix variable"""
+        return self.blob_prefix + os.path.basename(file)
+
+    def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]:
+        """Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name."""
+        uploaded_files = []
+        wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
+        for file in sftp_files:
+            with NamedTemporaryFile("w") as tmp:
+                self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name)
+                self.log.info(
+                    'Uploading %s to wasb://%s as %s',
+                    file.sftp_file_path,
+                    self.container_name,
+                    file.blob_name,
+                )
+                wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options)
+
+                uploaded_files.append(file.sftp_file_path)
+
+        return uploaded_files
+
+    def delete_files(self, uploaded_files: List[str]) -> None:
+        """Delete files at SFTP which have been moved to Azure Blob Storage."""
+        for sftp_file_path in uploaded_files:
+            self.log.info("Executing delete of %s", sftp_file_path)
+            self.sftp_hook.delete_file(sftp_file_path)
diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
new file mode 100644
index 0000000..b628681
--- /dev/null
+++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
@@ -0,0 +1,61 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Azure Blob Storage Transfer Operator
+====================================
+The Blob service stores text and binary data as objects in the cloud.
+The Blob service offers the following three resources: the storage account, containers, and blobs.
+Within your storage account, containers provide a way to organize sets of blobs.
+For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.
+
+Before you begin
+^^^^^^^^^^^^^^^^
+Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
+Please follow Azure
+`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
+to do it.
+
+See following example.
+Set values for these fields:
+
+.. code-block::
+
+  SFTP Conn Id: sftp_default
+  WASB Conn Id: wasb_default
+
+.. contents::
+  :depth: 1
+  :local:
+
+.. _howto/operator:SFTPToWasbOperator:
+
+Transfer Data from SFTP Source Path to Blob Storage
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage
+
+To get information about jobs within a Azure Blob Storage use:
+:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator`
+Example usage:
+
+.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_sftp_to_wasb]
+    :end-before: [END how_to_sftp_to_wasb]
diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py
new file mode 100644
index 0000000..1c34835
--- /dev/null
+++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py
@@ -0,0 +1,256 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest import mock
+
+from airflow import AirflowException
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator
+
+TASK_ID = "test-gcs-to-sftp-operator"
+WASB_CONN_ID = "wasb_default"
+SFTP_CONN_ID = "ssh_default"
+
+CONTAINER_NAME = "test-container"
+WILDCARD_PATH = "main_dir/*"
+WILDCARD_FILE_NAME = "main_dir/test_object*.json"
+SOURCE_PATH_NO_WILDCARD = "main_dir/"
+SOURCE_OBJECT_MULTIPLE_WILDCARDS = "main_dir/csv/*/test_*.csv"
+BLOB_PREFIX = "sponge-bob"
+EXPECTED_BLOB_NAME = "test_object3.json"
+EXPECTED_FILES = [SOURCE_PATH_NO_WILDCARD + EXPECTED_BLOB_NAME]
+
+
+class TestSFTPToWasbOperator(unittest.TestCase):
+    def test_init(self):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            blob_prefix=BLOB_PREFIX,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        assert operator.sftp_source_path == SOURCE_PATH_NO_WILDCARD
+        assert operator.sftp_conn_id == SFTP_CONN_ID
+        assert operator.container_name == CONTAINER_NAME
+        assert operator.wasb_conn_id == WASB_CONN_ID
+        assert operator.blob_prefix == BLOB_PREFIX
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook', autospec=True)
+    def test_execute_more_than_one_wildcard_exception(self, mock_hook):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            blob_prefix=BLOB_PREFIX,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        with self.assertRaises(AirflowException) as cm:
+            operator.check_wildcards_limit()
+
+        err = cm.exception
+        assert "Only one wildcard '*' is allowed" in str(err)
+
+    def test_get_sftp_tree_behavior(self):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=WILDCARD_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        sftp_complete_path, prefix, delimiter = operator.get_tree_behavior()
+
+        assert sftp_complete_path == 'main_dir', "not matched at expected complete path"
+        assert prefix == 'main_dir/', "Prefix must be EQUAL TO wildcard"
+        assert delimiter == "", "Delimiter must be empty"
+
+    def test_get_sftp_tree_behavior_without_wildcard(self):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        sftp_complete_path, prefix, delimiter = operator.get_tree_behavior()
+
+        assert sftp_complete_path == 'main_dir/', "not matched at expected complete path"
+        assert prefix is None, "Prefix must be NONE when no wildcard"
+        assert delimiter is None, "Delimiter must be none"
+
+    def test_source_path_contains_wildcard(self):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=WILDCARD_PATH,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        output = operator.source_path_contains_wildcard
+        assert output is True, "This path contains a wildpath"
+
+    def test_source_path_not_contains_wildcard(self):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+        output = operator.source_path_contains_wildcard
+        assert output is False, "This path does not contains a wildpath"
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+    def test_get_sftp_files_map_no_wildcard(self, sftp_hook, mock_hook):
+        sftp_hook.return_value.get_tree_map.return_value = [
+            EXPECTED_FILES,
+            [],
+            [],
+        ]
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=True,
+        )
+        files = operator.get_sftp_files_map()
+
+        assert len(files) == 1, "no matched at expected found files"
+        assert files[0].blob_name == EXPECTED_BLOB_NAME, "expected blob name not matched"
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+    def test_copy_files_to_wasb(self, sftp_hook, mock_hook):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=True,
+        )
+
+        sftp_files = [SftpFile(EXPECTED_FILES[0], EXPECTED_BLOB_NAME)]
+        files = operator.copy_files_to_wasb(sftp_files)
+
+        operator.sftp_hook.retrieve_file.assert_has_calls([mock.call("main_dir/test_object3.json", mock.ANY)])
+
+        mock_hook.return_value.load_file.assert_called_once_with(
+            mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME, overwrite=False
+        )
+
+        assert len(files) == 1, "no matched at expected uploaded files"
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+    def test_delete_files(self, sftp_hook):
+        sftp_mock = sftp_hook.return_value
+
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=SOURCE_PATH_NO_WILDCARD,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=True,
+        )
+
+        sftp_file_paths = EXPECTED_FILES
+        operator.delete_files(sftp_file_paths)
+
+        sftp_mock.delete_file.assert_has_calls([mock.call(EXPECTED_FILES[0])])
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+    def test_execute(self, sftp_hook, mock_hook):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=WILDCARD_FILE_NAME,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=False,
+        )
+
+        sftp_hook.return_value.get_tree_map.return_value = [
+            ["main_dir/test_object.json"],
+            [],
+            [],
+        ]
+
+        operator.execute(None)
+
+        sftp_hook.return_value.get_tree_map.assert_called_with(
+            "main_dir", prefix="main_dir/test_object", delimiter=".json"
+        )
+
+        sftp_hook.return_value.retrieve_file.assert_has_calls(
+            [mock.call("main_dir/test_object.json", mock.ANY)]
+        )
+
+        mock_hook.return_value.load_file.assert_called_once_with(
+            mock.ANY, CONTAINER_NAME, "test_object.json", overwrite=False
+        )
+
+        sftp_hook.return_value.delete_file.assert_not_called()
+
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook')
+    @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook')
+    def test_execute_moved_files(self, sftp_hook, mock_hook):
+        operator = SFTPToWasbOperator(
+            task_id=TASK_ID,
+            sftp_source_path=WILDCARD_FILE_NAME,
+            sftp_conn_id=SFTP_CONN_ID,
+            container_name=CONTAINER_NAME,
+            wasb_conn_id=WASB_CONN_ID,
+            move_object=True,
+            blob_prefix=BLOB_PREFIX,
+        )
+
+        sftp_hook.return_value.get_tree_map.return_value = [
+            ["main_dir/test_object.json"],
+            [],
+            [],
+        ]
+
+        operator.execute(None)
+
+        sftp_hook.return_value.get_tree_map.assert_called_with(
+            "main_dir", prefix="main_dir/test_object", delimiter=".json"
+        )
+
+        sftp_hook.return_value.retrieve_file.assert_has_calls(
+            [mock.call("main_dir/test_object.json", mock.ANY)]
+        )
+
+        mock_hook.return_value.load_file.assert_called_once_with(
+            mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json", overwrite=False
+        )
+        assert sftp_hook.return_value.delete_file.called is True, "File must be moved"
diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
new file mode 100644
index 0000000..600f84f
--- /dev/null
+++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILENAME,
+)
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILENAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
diff --git a/tests/test_utils/sftp_system_helpers.py b/tests/test_utils/sftp_system_helpers.py
new file mode 100644
index 0000000..accf3c9
--- /dev/null
+++ b/tests/test_utils/sftp_system_helpers.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+from contextlib import contextmanager
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.utils.process_utils import patch_environ
+
+SFTP_CONNECTION_ID = os.environ.get("SFTP_CONNECTION_ID", "sftp_default")
+
+
+@contextmanager
+def provide_sftp_default_connection(key_file_path: str):
+    """
+    Context manager to provide a temporary value for sftp_default connection
+
+    :param key_file_path: Path to file with sftp_default credentials .json file.
+    :type key_file_path: str
+    """
+    if not key_file_path.endswith(".json"):
+        raise AirflowException("Use a JSON key file.")
+    with open(key_file_path) as credentials:
+        creds = json.load(credentials)
+    conn = Connection(
+        conn_id=SFTP_CONNECTION_ID,
+        conn_type="ssh",
+        port=creds.get("port", None),
+        host=creds.get("host", None),
+        login=creds.get("login", None),
+        password=creds.get("password", None),
+        extra=json.dumps(creds.get('extra', None)),
+    )
+    with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}):
+        yield