You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/16 03:30:06 UTC

[GitHub] [airflow] wolvery opened a new pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

wolvery opened a new pull request #14254:
URL: https://github.com/apache/airflow/pull/14254


   Description
   
   Transfer to scan files from sftp source path and upload them to Azure Blob Storage
   
   Use case / motivation
   
   Scan a source path and upload files using a wildcard or the complete source path of a file at SFTP and upload these files to a Azure Blob Storage.
   PR Created
   Process to upload sftp files to blob storage #9683
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-783100066


   [The Workflow run](https://github.com/apache/airflow/actions/runs/588111467) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r579713000



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import SFTP_SRC_PATH
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+FILENAME = "TEST.TXT"
+

Review comment:
       Since you are using sftp hook here, you also need a way to authenticate it just like you did for wasb by providing key json file.
   I get connection errors when I run the system test with :
   ```
   pytest tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py --system azure -s
   ```

##########
File path: airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow import DAG
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.utils.dates import days_ago
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("AZURE_SFTP_SRC_PATH", "test-sftp-azure")
+
+with DAG(
+    "example_sftp_to_wasb",
+    schedule_interval=None,
+    start_date=days_ago(1),  # Override to match your needs
+) as dag:
+
+    # [START how_to_sftp_to_wasb]
+    transfer_files_to_azure = SFTPToWasbOperator(
+        task_id="transfer_files_from_sftp_to_wasb",
+        # SFTP args
+        sftp_conn_id="sftp_default",
+        sftp_source_path=SFTP_SRC_PATH,
+        # AZURE args
+        wasb_conn_id="wasb_default",
+        container_name=AZURE_CONTAINER_NAME,
+        blob_prefix=BLOB_PREFIX,
+    )
+    # [END how_to_sftp_to_wasb]

Review comment:
       Since we are using this example for system test, I'm thinking we can have more operators here to clean up. My suggestions:
   Have an SFTPOperator to transfer files from local to a remote server, then transfer the files to azure and finally clean up files in remote server using delete_file method of SFTPHook and azure blob storage delete operator




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r589530274



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property

Review comment:
       ```suggestion
   try:
       from functools import cached_property
   except ImportError:
       from cached_property import cached_property
   ```

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SFTPToWasbOperator`
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.

Review comment:
       ```suggestion
           ``WasbHook.load_file()`` takes.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-791786086


   Can you rebase again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r582805851



##########
File path: airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow import DAG
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.utils.dates import days_ago
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("AZURE_SFTP_SRC_PATH", "test-sftp-azure")
+
+with DAG(
+    "example_sftp_to_wasb",
+    schedule_interval=None,
+    start_date=days_ago(1),  # Override to match your needs
+) as dag:
+
+    # [START how_to_sftp_to_wasb]
+    transfer_files_to_azure = SFTPToWasbOperator(
+        task_id="transfer_files_from_sftp_to_wasb",
+        # SFTP args
+        sftp_conn_id="sftp_default",
+        sftp_source_path=SFTP_SRC_PATH,
+        # AZURE args
+        wasb_conn_id="wasb_default",
+        container_name=AZURE_CONTAINER_NAME,
+        blob_prefix=BLOB_PREFIX,
+    )
+    # [END how_to_sftp_to_wasb]

Review comment:
       @ephraimbuddy I have used an operator to move data to SFTP and hook to delete them after the dag has finished.
   It worked nice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r585844937



##########
File path: airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.providers.sftp.operators.sftp import SFTPOperator
+from airflow.utils.dates import days_ago
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("AZURE_WASB_SFTP_SRC_PATH", "/sftp")
+LOCAL_FILE_PATH = os.environ.get("FILE_TO_SFTPWASB_LOCAL_SRC_PATH", "/tmp")
+SAMPLE_FILE_NAME = os.environ.get("FILE_TO_SFTPWASB", "sftp_to_wasb_test.txt")
+FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILE_NAME)
+SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILE_NAME)

Review comment:
       ```suggestion
   FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME)
   SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME)
   ```

##########
File path: airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.providers.sftp.operators.sftp import SFTPOperator
+from airflow.utils.dates import days_ago
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("AZURE_WASB_SFTP_SRC_PATH", "/sftp")
+LOCAL_FILE_PATH = os.environ.get("FILE_TO_SFTPWASB_LOCAL_SRC_PATH", "/tmp")
+SAMPLE_FILE_NAME = os.environ.get("FILE_TO_SFTPWASB", "sftp_to_wasb_test.txt")

Review comment:
       ```suggestion
   SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp")
   LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp")
   SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt")
   ```
   I have tested the system tests and it's fine but can you make these environment variables simpler? Also, you may want to install [pre-commit hooks](https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#pre-commit-hooks) for static checks before you commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583233428



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    AZURE_CONTAINER_NAME,
+    BLOB_PREFIX,
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+    SFTP_FILE_COMPLETE_PATH,
+)
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
+        WasbHook(wasb_conn_id="wasb_default").delete_file(
+            AZURE_CONTAINER_NAME, BLOB_PREFIX + SAMPLE_FILE_NAME
+        )
+        SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH)

Review comment:
       ```suggestion
         
   ```
   Can you move this to the example dag? We have Wasb delete operator. Idealy we should have sftp hook delete operator too but for now, you can use the python operator to run it. https://github.com/apache/airflow/blob/2b5d4e3ff3c61ea6074caa300bbb8d16027408a6/airflow/providers/microsoft/azure/example_dags/example_fileshare.py#L37-L40




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r579713000



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import SFTP_SRC_PATH
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+FILENAME = "TEST.TXT"
+

Review comment:
       Since you are using sftp hook here, you also need a way to authenticate it just like you did for wasb by providing key json file.
   I get authentication errors when I run the system test with :
   ```
   pytest tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py --system azure -s
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-789372184


   [The Workflow run](https://github.com/apache/airflow/actions/runs/616115124) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-808986291


   Hi @ashb, I hope you can check this PR, again, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-840905317


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r579711937



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SFTPToWasbOperator`
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")

Review comment:
       ```suggestion
       template_fields = ("sftp_source_path", "container_name", "blob_prefix")
   ```
   Looks like it should be blob_prefix?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-787529838


   LGTM, just a nit..Can you use pure assert in tests(Because of this PR https://github.com/apache/airflow/pull/12951)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14254:
URL: https://github.com/apache/airflow/pull/14254


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-792028938


   > Can you rebase again?
   
   Sure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-792827466


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583956892



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+)
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)

Review comment:
       @ephraimbuddy Did you mean this section, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-780178388


   Nice @wolvery. One more suggestion, can you add a system test for this change? Here is an example system test: https://github.com/apache/airflow/blob/master/tests/providers/microsoft/azure/transfers/test_file_to_wasb_system.py


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r579720898



##########
File path: airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow import DAG
+from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
+from airflow.utils.dates import days_ago
+
+AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
+BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
+SFTP_SRC_PATH = os.environ.get("AZURE_SFTP_SRC_PATH", "test-sftp-azure")
+
+with DAG(
+    "example_sftp_to_wasb",
+    schedule_interval=None,
+    start_date=days_ago(1),  # Override to match your needs
+) as dag:
+
+    # [START how_to_sftp_to_wasb]
+    transfer_files_to_azure = SFTPToWasbOperator(
+        task_id="transfer_files_from_sftp_to_wasb",
+        # SFTP args
+        sftp_conn_id="sftp_default",
+        sftp_source_path=SFTP_SRC_PATH,
+        # AZURE args
+        wasb_conn_id="wasb_default",
+        container_name=AZURE_CONTAINER_NAME,
+        blob_prefix=BLOB_PREFIX,
+    )
+    # [END how_to_sftp_to_wasb]

Review comment:
       Excellent idea! Thank you




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-789372345


   [The Workflow run](https://github.com/apache/airflow/actions/runs/616116276) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-779570404


   [The Workflow run](https://github.com/apache/airflow/actions/runs/570491827) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-798689634


   @ashb you can take a look once more


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583233428



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    AZURE_CONTAINER_NAME,
+    BLOB_PREFIX,
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+    SFTP_FILE_COMPLETE_PATH,
+)
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
+        WasbHook(wasb_conn_id="wasb_default").delete_file(
+            AZURE_CONTAINER_NAME, BLOB_PREFIX + SAMPLE_FILE_NAME
+        )
+        SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH)

Review comment:
       ```suggestion
         
   ```
   Can you move this to the example dag? We have Wasb delete operator. Idealy we should have sftp hook operator too but for now, you can use the python operator to run it. https://github.com/apache/airflow/blob/2b5d4e3ff3c61ea6074caa300bbb8d16027408a6/airflow/providers/microsoft/azure/example_dags/example_fileshare.py#L37-L40




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583906693



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    AZURE_CONTAINER_NAME,
+    BLOB_PREFIX,
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+    SFTP_FILE_COMPLETE_PATH,
+)
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
+        WasbHook(wasb_conn_id="wasb_default").delete_file(
+            AZURE_CONTAINER_NAME, BLOB_PREFIX + SAMPLE_FILE_NAME
+        )
+        SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH)

Review comment:
       @ephraimbuddy I did, please check my last commit. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-789372443


   [The Workflow run](https://github.com/apache/airflow/actions/runs/616074971) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r579720853



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SFTPToWasbOperator`
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")

Review comment:
       Thank you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583957507



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+)
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)

Review comment:
       No. You're right. You removed it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r583895748



##########
File path: tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py
##########
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+
+import pytest
+
+from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import (
+    AZURE_CONTAINER_NAME,
+    BLOB_PREFIX,
+    FILE_COMPLETE_PATH,
+    LOCAL_FILE_PATH,
+    SAMPLE_FILE_NAME,
+    SFTP_FILE_COMPLETE_PATH,
+)
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from tests.test_utils.azure_system_helpers import (
+    AZURE_DAG_FOLDER,
+    AzureSystemTest,
+    provide_wasb_default_connection,
+)
+from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection
+
+CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys')
+SFTP_DEFAULT_KEY = 'sftp_key.json'
+WASB_DEFAULT_KEY = 'wasb_key.json'
+CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY)
+CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY)
+
+
+@pytest.mark.backend('postgres', 'mysql')
+@pytest.mark.credential_file(WASB_DEFAULT_KEY)
+@pytest.mark.credential_file(SFTP_DEFAULT_KEY)
+class TestSFTPToWasbSystem(AzureSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_dummy_file(SAMPLE_FILE_NAME, LOCAL_FILE_PATH)
+
+    def tearDown(self):
+        os.remove(FILE_COMPLETE_PATH)
+        super().tearDown()
+
+    @provide_wasb_default_connection(CREDENTIALS_WASB_PATH)
+    @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH)
+    def test_run_example_file_to_wasb(self):
+        self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER)
+        WasbHook(wasb_conn_id="wasb_default").delete_file(
+            AZURE_CONTAINER_NAME, BLOB_PREFIX + SAMPLE_FILE_NAME
+        )
+        SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH)

Review comment:
       Can you delete as suggested above  now that you have implemented in example dags




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r576850110



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,

Review comment:
       It have been applied. Thank you.

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,

Review comment:
       It have been applied. Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r576621861



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,

Review comment:
       ```suggestion
           self,
           *,
   ```

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,

Review comment:
       ```suggestion
   ```
   Operator arguments are now keyword only

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)

Review comment:
       ```suggestion
           super().__init__(**kwargs)
   ```

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+
+        self.sftp_source_path = sftp_source_path
+        self.blob_prefix = blob_prefix
+        self.sftp_conn_id = sftp_conn_id
+        self.wasb_conn_id = wasb_conn_id
+        self.container_name = container_name
+        self.wasb_conn_id = wasb_conn_id
+        self.load_options = load_options if load_options is not None else {}

Review comment:
       ```suggestion
           self.load_options = load_options or {}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery edited a comment on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery edited a comment on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-792373704


   > Can you rebase again?
   
   I have performed rebase and saw the execution of these tests running ok at local breeze environment. Is there something that I need to repair?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] wolvery commented on pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

Posted by GitBox <gi...@apache.org>.
wolvery commented on pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#issuecomment-792373704


   > Can you rebase again?
   
   I have permoed rebase and saw the execution of these tests running ok at breeze environment. Is there something that I need to repair?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org