You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/27 11:39:20 UTC

[GitHub] [airflow] AngryHelper opened a new pull request #19852: created SFTPBatchOperator which add batch function

AngryHelper opened a new pull request #19852:
URL: https://github.com/apache/airflow/pull/19852


   Added SFTPBatchOperator, with batch transfer file function.
   Added some docs how to use
   
   closes: AIRFLOW-5703
   related: AIRFLOW-5703


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-997833700


   > Judging by the pysftp source code, the ability to transport a directory is implemented there, but not a separate list of files.
   
   That's also good for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1052842382


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762573083



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path

Review comment:
       Okay, I think your idea is good, and i want to add it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-997398403


   > I cannot see my comment - I thought I submitted it but somehow I cannot see it in GitHub.
   > One very important comment - I believe that tha validation and execution logic should all be moved to the Hook (as new batch_transfer method?). Operator should be thin layer over hooks capabilities, because you cannot combine several operators easily to run as single tasks, where Hooks are precisely desgined to handle the case. So if your logic is in Hook, you could easily add @task decorated code where in one step you batch download files, and in the next step the files can be used (and for example sent elsewhere). When this logic is in Operator, you cannot do it easily.
   So I propose to move everything you have in _check_conn and execute() to hook and call hook for those methods. This way it will be much more usable.
   > 
   ```
   @task
   def my_task:
       sftp_hook = SftpHook()
       sftp_hook.batch_transfer(GET,....)
       another_hook = AnotherHook()
       another_hook.do_something_with_the_files(....)
   
   ```
   
   @potiuk We have different sftp client dependencies in SFTPHook and SFTPOperators. 
   In SFTPHook we have pysftp, but in SFTPOperators we have paramiko.
   I don’t want to rewrite the operator, but I understand that we should use the same hook methods in a good way.
   Judging by the pysftp source code, the ability to transport a directory is implemented there, but not a separate list of files.
   ```
   @task
   def my_task:
       sftp_hook = SftpHook()
       client = sftp_hook.get_conn()
       client.get_d(localpath, remotepath)
       client.put_d(localpath, remotepath)    
   ```
   What you think about it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-981075133


   Hello @potiuk , pls check this PR.
   I will be grateful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762572530



##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",

Review comment:
       Yes, i makes some mistake with this example, I fix that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775567652



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:
+        dump_file_name_for_log = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if self.local_folder and self.remote_folder:
+                        files_list = self._search_files(os.listdir(self.local_folder))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, self.local_folder, local_file, self.remote_folder)
+                    if self.local_files_path and self.remote_folder:
+                        for file in self.local_files_path:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_folder)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if self.remote_folder and self.local_folder:
+                        files_list = self._search_files(sftp_client.listdir(self.remote_folder))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, self.remote_folder)
+                    if self.remote_files_path and self.local_folder:
+                        for file in self.remote_files_path:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, os.path.dirname(file))
+
+        except Exception as e:
+            raise AirflowException(f"Error while transferring {dump_file_name_for_log}, error: {str(e)}")
+
+        return self.local_folder

Review comment:
       its not a important, we may drop this line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r780684559



##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,147 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.

Review comment:
       done, i separated docs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767297867



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):
+                        for file in self.local_path:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_path)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if isinstance(self.remote_path, str):
+                        files_list = self._search_files(sftp_client.listdir(self.remote_path))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, remote_file, self.remote_path)
+                    if isinstance(self.remote_path, list) and isinstance(self.local_path, str):

Review comment:
       done ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1002323697


   Docs are failing too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r780684396



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs

Review comment:
       done

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.

Review comment:
       done

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,

Review comment:
       done

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:

Review comment:
       done

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:
+        dump_file_name_for_log = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if self.local_folder and self.remote_folder:
+                        files_list = self._search_files(os.listdir(self.local_folder))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, self.local_folder, local_file, self.remote_folder)
+                    if self.local_files_path and self.remote_folder:
+                        for file in self.local_files_path:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_folder)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if self.remote_folder and self.local_folder:
+                        files_list = self._search_files(sftp_client.listdir(self.remote_folder))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, self.remote_folder)
+                    if self.remote_files_path and self.local_folder:
+                        for file in self.remote_files_path:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, os.path.dirname(file))
+
+        except Exception as e:
+            raise AirflowException(f"Error while transferring {dump_file_name_for_log}, error: {str(e)}")
+
+        return self.local_folder

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762594771



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):
+                        for file in self.local_path:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_path)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if isinstance(self.remote_path, str):
+                        files_list = self._search_files(sftp_client.listdir(self.remote_path))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, remote_file, self.remote_path)
+                    if isinstance(self.remote_path, list) and isinstance(self.local_path, str):

Review comment:
       When you get a good error message, that's fine, 
   
   But  I am afraid that when you try to write many files to a file that already exists, it will not error out but it will save all the files one-by-one as the same target file - and will not produce an error. Am I right about it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762590392



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):

Review comment:
       if there directory or file is not exist and create_intermediate_dirs is false, then there will be an error at the time of file transfer, is it worth overloading the method and doing explicit checks for everything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1001206421


   Okay mr. @potiuk all tasks is resolved :D


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767289383



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):
+                        for file in self.local_path:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_path)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if isinstance(self.remote_path, str):
+                        files_list = self._search_files(sftp_client.listdir(self.remote_path))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, remote_file, self.remote_path)
+                    if isinstance(self.remote_path, list) and isinstance(self.local_path, str):

Review comment:
       i checked this point, if the file already exists then it will be overwritten. Since this can be viewed as a feature, I will add an additional parameter `force` so that with such, either give an error or overwrite




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775568182



##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,147 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.

Review comment:
       this doc for a both operators




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775322203



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       I think we should be very careful about adding type validations to template fields (IMO these validations should move under the `execute()` scope). There might be unexpected behavior if users pass in `XComArgs` or even as an explicit Jinja expression like `{{ ti.xcom_pull(task_ids="some_task"...) }}` which are not rendered entirely until `execute()`. The `XComArgs`/Jinja expression may indeed render to lists or strings but their types will be evaluated as `XComArg` or string, respectively, within `__init__()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1008054531


   anything else? :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r780656331



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs

Review comment:
       I think I'll make a separate file for common functions.
   
   `Actually, maybe the move is to have both the SFTPOperator and SFTPBatchOperator both live in a single providers/sftp/operators/sftp.py file.`
   I do not like large files with code where classes do not depend on each other
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1011082315


   Fixing doc/static checks and rebasing ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767306832



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (

Review comment:
       Nice. As clear as it can be for this kind of condition :)

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       NIT: I think this should also be added to the main docstring of the operator. It''s cool you check it and raise if it wrongly used, but I think this description should also be added as docstring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775322203



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       I think we should be very careful about adding type validations to template fields (IMO these validations should move under the `execute()` scope). There might be unexpected behavior if users pass in `XComArgs` or even as an explicit Jinja expression like `{{ ti.xcom_pull(task_ids="some_task"...) }}` which are not rendered/evaluated until `execute()`. The `XComArgs`/Jinja expression may indeed render to lists or strings but their types will be evaluated as `XComArg` or string, respectively, within `__init__()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#issuecomment-1006962541


   Are you gonig to rebase @AngryHelper ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775259880



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767298102



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path

Review comment:
       done

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767298864



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r780684366



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       i added also one argument for this case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r775321471



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,

Review comment:
       ```suggestion
           local_files_path: Optional[List[str]] = None,
           remote_files_path: Optional[List[str]] = None,
   ```
   For consistency, it may be nice to add typing annotations to the other parameters as well.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,283 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(

Review comment:
       I think we should be very careful about adding type validations to template fields (IMO these validations should move under the `execute()` scope). There might be unexpected behavior if users pass in `XComArgs` which are not rendered/evaluated until `execute()`. The `XComArgs` may indeed render to lists or strings but their types will be evaluated as `XComArg` within `__init__()`.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:
+        dump_file_name_for_log = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if self.local_folder and self.remote_folder:
+                        files_list = self._search_files(os.listdir(self.local_folder))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, self.local_folder, local_file, self.remote_folder)
+                    if self.local_files_path and self.remote_folder:
+                        for file in self.local_files_path:
+                            local_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_remote_file(f"{self.remote_folder}/{local_file}", sftp_client)
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_folder)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if self.remote_folder and self.local_folder:
+                        files_list = self._search_files(sftp_client.listdir(self.remote_folder))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, self.remote_folder)
+                    if self.remote_files_path and self.local_folder:
+                        for file in self.remote_files_path:
+                            remote_file = os.path.basename(file)
+                            dump_file_name_for_log = file
+                            self._check_local_file(f"{self.local_folder}/{remote_file}")
+                            self._transfer(sftp_client, self.local_folder, remote_file, os.path.dirname(file))
+
+        except Exception as e:
+            raise AirflowException(f"Error while transferring {dump_file_name_for_log}, error: {str(e)}")
+
+        return self.local_folder

Review comment:
       Should this return `self.local_folder` and/or `self.remote_folder` depending on what is provided?

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: list = None,
+        remote_files_path: list = None,
+        local_folder: str = None,
+        remote_folder: str = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        self.force = force
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (
+                self.operation == SFTPOperation.PUT
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_files_path, list)
+                        and isinstance(self.remote_folder, str)
+                        and self.local_folder is None
+                        and remote_files_path is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+            or (
+                self.operation == SFTPOperation.GET
+                and (
+                    (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_folder, str)
+                        and local_files_path is None
+                        and remote_files_path is None
+                    )
+                    or (
+                        isinstance(self.local_folder, str)
+                        and isinstance(self.remote_files_path, list)
+                        and self.local_files_path is None
+                        and remote_folder is None
+                        and self.regexp_mask is None
+                    )
+                )
+            )
+        ):
+            raise TypeError(
+                """
+                Unsupported argument pool,
+                Possible options for PUT:
+                    1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+                    2.local_files_path:list + remote_folder:str
+                Possible options for GET:
+                    1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+                    2.local_folder:str + remote_files_path:list
+                """
+            )
+
+    def execute(self, context: Any) -> str:

Review comment:
       Now that 2.2.3 has been released, the typing of `context` can be:
   
   ```python
   from typing import TYPE_CHECKING
   
   if TYPE_CHECKING:
       from airflow.utils.context import Context
   
   ...
   def execute(self, context: "Context") -> str:
   ...
   ```

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs

Review comment:
       WDYT about moving `SFTPOperation `, `_check_conn` and `_make_intermediate_dirs` into a centralized location in the provider since these are used across multiple operators now?
   
   Perhaps `_check_conn` and `_make_intermediate_dirs` can live in a `providers/sftp/utils/` directory? A pattern in multiple providers for classes like `SFTPOperation` is to store them in the corresponding hook.
   
   

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs

Review comment:
       Actually, maybe the move is to have both the `SFTPOperator` and `SFTPBatchOperator` both live in a single `providers/sftp/operators/sftp.py` file.

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,147 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.

Review comment:
       Is this doc meant to describe both `SFTPOperator` and `SFTPBatchOperator` or just one of the operators? This sections reads like the doc is about `SFTPOperator` only but there are code snippets for `SFTPBatchOperator` as well further on.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,294 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.

Review comment:
       ```suggestion
       SFTPBatchOperator for transferring files from remote host to local or vice a versa.
   ```
   Also, for users it might be nice to have more details about how this operator is different than `SFTPOperator` when reading the API documentation (which is generated from the docstring).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #19852:
URL: https://github.com/apache/airflow/pull/19852


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r783132963



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import ast
+import os
+import re
+from pathlib import Path
+from typing import List, Optional
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation
+from airflow.providers.sftp.utils.common import check_conn, make_intermediate_dirs
+from airflow.utils.context import Context
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPBatchOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    :param list_as_str: if you uses xcom or string in remote_files_path or local_files_path
+        for example your xcom return "['/tmp/tmp1/batch/file1.txt']"
+        then turn use_xcom_args = True and your will be xcom transformed to list type
+    :type list_as_str: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: Optional[List[str]] = None,
+        remote_files_path: Optional[List[str]] = None,
+        local_folder: Optional[str] = None,
+        remote_folder: Optional[str] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        list_as_str=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_files_path = local_files_path
+        self.remote_files_path = remote_files_path
+        self.local_folder = local_folder
+        self.remote_folder = remote_folder
+        self.regexp_mask = regexp_mask
+        self.operation = operation

Review comment:
       ```suggestion
           self.operation = operation.lower()
   ```
   
   There is a little bit of a mixed bag of using `self.operation.lower()` and `self.operation` in some of the validations. When the value is not lowercase some of the validations won't work as expected. If you set the value to be lower case initially you don't need to repeat applying the function throughout.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import ast
+import os
+import re
+from pathlib import Path
+from typing import List, Optional
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation
+from airflow.providers.sftp.utils.common import check_conn, make_intermediate_dirs
+from airflow.utils.context import Context
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPBatchOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    :param list_as_str: if you uses xcom or string in remote_files_path or local_files_path
+        for example your xcom return "['/tmp/tmp1/batch/file1.txt']"
+        then turn use_xcom_args = True and your will be xcom transformed to list type
+    :type list_as_str: bool
+        copying from remote to local and vice-versa. Default is False.

Review comment:
       There exists `render_template_as_native_obj` ([reference](https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#rendering-fields-as-native-python-objects)) that can be enabled at the DAG level which will turn templated values into native Python types rather than strings. I don't think you need to handle this conversion in the operator.

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,147 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.

Review comment:
       I don't necessarily think you need to split the documentation especially since they are quite similar. This comment was really about making sure the documentation stated that is was about both operators rather than just `SFTPOperator`. It's very common to have one document for a grouping of operators and IMO this is a great candidate.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import ast
+import os
+import re
+from pathlib import Path
+from typing import List, Optional
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation

Review comment:
       IMO the operation statuses should also come from a common location rather than an operator.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import ast
+import os
+import re
+from pathlib import Path
+from typing import List, Optional
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation
+from airflow.providers.sftp.utils.common import check_conn, make_intermediate_dirs
+from airflow.utils.context import Context
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPBatchOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_files_path: local files path to get or put. (templated)
+    :type local_files_path: list
+    :param local_folder: local folder path to get or put. (templated)
+    :type local_folder: str
+    :param remote_folder: remote folder path to get or put. (templated)
+    :type remote_folder: str
+    :param remote_files_path: remote folder path to get or put. (templated)
+    :type remote_files_path: list
+    :param regexp_mask: regexp mask for file match in local_folder for PUT operational
+        or match filenames in remote_folder for GET operational. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+    :param force: if the file already exists, it will be overwritten
+    :type force: bool
+        copying from remote to local and vice-versa. Default is False.
+    :param list_as_str: if you uses xcom or string in remote_files_path or local_files_path
+        for example your xcom return "['/tmp/tmp1/batch/file1.txt']"
+        then turn use_xcom_args = True and your will be xcom transformed to list type
+    :type list_as_str: bool
+        copying from remote to local and vice-versa. Default is False.
+    Summary, support arguments:
+        Possible options for PUT:
+            1.optional(regexp_mask:str) + local_folder:str + remote_folder:str
+            2.local_files_path:list + remote_folder:str
+        Possible options for GET:
+            1.local_folder:str + remote_folder:str + optional(regexp_mask:str)
+            2.local_folder:str + remote_files_path:list
+    Example:
+    Move all txt files
+        from local `/tmp/dir_for_local_transfer/` to remote folder `/tmp/dir_for_remote_transfer/`
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from local to remote folder `/tmp/dir_for_remote_transfer/`
+            put_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_files_path=["/tmp/file1.txt",],
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+    Move all files
+        from remote folder `/tmp/dir_for_remote_transfer/` to local folder `/tmp/dir_for_local_transfer/`
+            get_dir = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/",
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    Move `/tmp/file1.txt` file
+        from remote to local folder `/tmp/dir_for_local_transfer/`
+            get_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_local_transfer/",
+                remote_files_path=["/tmp/file1.txt",],
+                operation=SFTPOperation.GET,
+                create_intermediate_dirs=True
+            )
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_files_path',
+        'remote_files_path',
+        'local_folder',
+        'remote_folder',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_files_path: Optional[List[str]] = None,
+        remote_files_path: Optional[List[str]] = None,
+        local_folder: Optional[str] = None,
+        remote_folder: Optional[str] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        force=False,
+        list_as_str=False,
+        **kwargs,

Review comment:
       It would still be nice to have typing for all parameters rather than a subset. Also applicable to some of the methods. 

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import ast
+import os
+import re
+from pathlib import Path
+from typing import List, Optional
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation
+from airflow.providers.sftp.utils.common import check_conn, make_intermediate_dirs
+from airflow.utils.context import Context

Review comment:
       This import needs to be under `TYPE_CHECKING` otherwise this provider will have a minimum Airflow version dependency of 2.2.3. Check out an [example](https://github.com/apache/airflow/blob/82e466d8f9b196e1efba44cc15214b9d0bd3b2d1/airflow/providers/databricks/operators/databricks.py#L28-L29) in the Databricks operator file.
   
   Then the `execute()` typing will be:
   ```python
   def execute(self, context: "Context") -> None:
   ```
   

##########
File path: docs/apache-airflow-providers-sftp/sftp_batch.rst
##########
@@ -0,0 +1,165 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPBatchOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp_batch.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_folder="/tmp/local_folder/",
+        remote_folder="/tmp/dir_for_remote_transfer/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send specific files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_files_path=[
+            "/tmp/local_folder/file1.txt",
+        ],
+        remote_folder="/tmp/dir_for_remote_transfer/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_folder="/tmp/local_folder/",
+        remote_folder="/tmp/dir_for_remote_transfer/",
+        regexp_mask=r".*\.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+4. Get specific list of files from the remote server to the local folder
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_folder="/tmp/local_folder/",
+        remote_files_path=[
+            "/tmp/dir_for_remote_transfer/file1.txt",
+            "/tmp/dir_for_remote_transfer/file2.txt",
+        ],
+        operation=SFTPOperation.GET,
+        create_intermediate_dirs=True,
+    )
+
+
+5. Get all files from the remote server to the local folder
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_folder="/tmp/local_folder/",
+        remote_folder="/tmp/dir_for_remote_transfer/",
+        operation=SFTPOperation.GET,
+        create_intermediate_dirs=True,
+    )
+
+
+6. Get all files from the remote server that match the specified pattern to the local folder with overwrite files
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_folder="/tmp/local_folder/",
+        remote_folder="/tmp/dir_for_remote_transfer/",
+        regexp_mask=r".*\.txt",
+        operation=SFTPOperation.GET,
+        create_intermediate_dirs=True,
+        force=True,
+    )
+
+
+7. You can use xcom for local_files_path or remote_files_path arguments,
+    when you do `xcom_pull` in arguments, then your xcom will be transformed to string type,
+    and you can use `list_as_str` argument for this case
+
+.. code-block:: python
+
+    def push_xcom(**context):
+        ti: TaskInstance = context["task_instance"]
+        # save as list
+        ti.xcom_push(
+            key="file_names", value=["/some_path/with_complex_logic/some_file.txt"]
+        )
+
+
+    PythonOperator(
+        task_id="push_xcom",
+        python_callable=push_xcom,
+        provide_context=True,

Review comment:
       This is a deprecated parameter.
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762439210



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):
+                        for file in self.local_path:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_path)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if isinstance(self.remote_path, str):
+                        files_list = self._search_files(sftp_client.listdir(self.remote_path))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, remote_file, self.remote_path)
+                    if isinstance(self.remote_path, list) and isinstance(self.local_path, str):

Review comment:
       Also here we should check that if a local file exists it should be a directory. 

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)

Review comment:
       It shoudl be better explained when the regexp_mask applies and how:
   * it should be explained it does not apply when the source is "list" (It's not obvious)
   * it should be explained whether it applies to the full path of the files after "list_dir" is applied.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)

Review comment:
       This description is wrong. As I understand it, this can be either folder path or list of paths to individual files. I think this is quite wrong approach. Those should be two different parameters to avoid confusion. Then you'd avoid checking for instance types

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path

Review comment:
       Should we also check the transfer direction here? I believe local(str). remote(list) is only valid for PUT and local(list), remote (str) is only valid for GET.

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",

Review comment:
       How would that work? I think this example makes no sense.

##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):

Review comment:
       I guess we should error out explicitly if the remote file is a directory ?

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",
+            "/tmp/dir_for_remote_transfer/remote/txt/file2.txt",
+        ],
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+4. Send all files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/txt/",
+        regexp_mask=".*[.]txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+5. Send list of files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_path=[
+            "/tmp/dir_for_remote_transfer/file1.txt",
+            "/tmp/dir_for_remote_transfer/file2.txt",
+        ],
+        remote_path="/tmp/dir_for_remote_transfer/remote/txt/",
+        regexp_mask=".*[.]txt",

Review comment:
       ```suggestion
           regexp_mask=r".*\.txt",
   ```

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",
+            "/tmp/dir_for_remote_transfer/remote/txt/file2.txt",
+        ],
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+4. Send all files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/txt/",
+        regexp_mask=".*[.]txt",

Review comment:
       ```suggestion
           regexp_mask=r".*\.txt",
   ```

##########
File path: docs/apache-airflow-providers-sftp/sftp.rst
##########
@@ -0,0 +1,111 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+SFTPOperator
+==========================
+Use the :class:`~airflow.providers.sftp.operators.sftp.py` to
+transfer data between servers under sftp.
+
+Using the Operator
+------------------
+To start working with an operator, you need to register an SFTP \ SSH connection in Airflow Connections.
+Use ssh_conn_id to specify the name of the connection.
+
+You can use the operator for the following tasks:
+
+1. Send one file to the server with the full path
+
+.. code-block:: python
+
+    put_file = SFTPOperator(
+        task_id="put_file",
+        ssh_conn_id="ssh_default",
+        local_filepath="/tmp/transfer_file/put_file_file1.txt",
+        remote_filepath="/tmp/transfer_file/remote/put_file_file1.txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+2. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+3. Send all files from local directory to remote server
+
+.. code-block:: python
+
+    put_dir_files = SFTPBatchOperator(
+        task_id="put_dir_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path=[
+            "/tmp/dir_for_remote_transfer/remote/txt/file1.txt",
+            "/tmp/dir_for_remote_transfer/remote/txt/file2.txt",
+        ],
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+4. Send all files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_path="/tmp/dir_for_remote_transfer/",
+        remote_path="/tmp/dir_for_remote_transfer/remote/txt/",
+        regexp_mask=".*[.]txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+5. Send list of files from the local directory that match the specified pattern to the remote server
+
+.. code-block:: python
+
+    put_dir_txt_files = SFTPBatchOperator(
+        task_id="put_dir_txt_files",
+        ssh_conn_id="ssh_default",
+        local_path=[
+            "/tmp/dir_for_remote_transfer/file1.txt",
+            "/tmp/dir_for_remote_transfer/file2.txt",
+        ],
+        remote_path="/tmp/dir_for_remote_transfer/remote/txt/",
+        regexp_mask=".*[.]txt",
+        operation=SFTPOperation.PUT,
+        create_intermediate_dirs=True,
+    )
+
+
+The operator also supports transfer files from a remote server to a local,
+for this you need to change the parameter ``operation`` from ``SFTPOperation.PUT`` to ``SFTPOperation.GET``.
+Parameter ``create_intermediate_dirs`` needed for create missing intermediate directories when

Review comment:
       ```suggestion
   Parameter ``create_intermediate_dirs`` is needed to create missing intermediate directories when
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r762590379



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)
+    :type local_path: str or list
+    :param remote_path: remote folder path to get or put. (templated)
+    :type remote_path: str or list
+    :param regexp_mask: regexp mask for file match in local_folder or remote_folder to get or put. (templated)
+    :type regexp_mask: str
+    :param operation: specify operation 'get' or 'put', defaults to put
+    :type operation: str
+    :param confirm: specify if the SFTP operation should be confirmed, defaults to True
+    :type confirm: bool
+    :param create_intermediate_dirs: create missing intermediate directories when
+    :type create_intermediate_dirs: bool
+        copying from remote to local and vice-versa. Default is False.
+        Example: The following task would copy ``file.txt`` to the remote host
+        at ``/tmp/tmp1/tmp2/`` while creating ``tmp``,``tmp1`` and ``tmp2`` if they
+        don't exist. If the parameter is not passed it would error as the directory
+        does not exist. ::
+            put_dir_txt_files = SFTPOperator(
+                task_id="put_dir_txt_files",
+                ssh_conn_id="ssh_default",
+                local_folder="/tmp/dir_for_remote_transfer/",
+                remote_folder="/tmp/dir_for_remote_transfer/txt",
+                regexp_mask=".*[.]txt",
+                operation=SFTPOperation.PUT,
+                create_intermediate_dirs=True
+            )
+
+    """
+
+    template_fields = (
+        'remote_host',
+        'local_path',
+        'remote_path',
+        'regexp_mask',
+    )
+
+    def __init__(
+        self,
+        *,
+        ssh_hook=None,
+        ssh_conn_id=None,
+        remote_host=None,
+        local_path: Union[str, list] = None,
+        remote_path: Union[str, list] = None,
+        regexp_mask=None,
+        operation=SFTPOperation.PUT,
+        confirm=True,
+        create_intermediate_dirs=False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.ssh_hook = ssh_hook
+        self.ssh_conn_id = ssh_conn_id
+        self.remote_host = remote_host
+        self.local_path = local_path
+        self.remote_path = remote_path
+        self.regexp_mask = regexp_mask
+        self.operation = operation
+        self.confirm = confirm
+        self.create_intermediate_dirs = create_intermediate_dirs
+        if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+            raise TypeError(
+                f"""Unsupported operation value {self.operation},
+                expected {SFTPOperation.GET} or {SFTPOperation.PUT}"""
+            )
+        if not (
+            (isinstance(self.local_path, str) and isinstance(self.remote_path, str))
+            or (isinstance(self.local_path, list) and isinstance(self.remote_path, str))
+            or (isinstance(self.remote_path, list) and isinstance(self.local_path, str))
+        ):
+            raise TypeError(
+                """Unsupported path argument value local_path and remote_path
+                Possible options: \n local_path is str and remote_path is str\n
+                local_path is list and remote_path is str\n
+                local_path is str and remote_path is list"""
+            )
+
+    def execute(self, context: Any) -> str:
+        file_msg = None
+        try:
+            _check_conn(self)
+
+            with self.ssh_hook.get_conn() as ssh_client:
+                sftp_client = ssh_client.open_sftp()
+                if self.operation.lower() == SFTPOperation.PUT:
+                    if isinstance(self.local_path, str):
+                        files_list = self._search_files(os.listdir(self.local_path))
+                        for file in files_list:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, local_file, self.remote_path)
+                    if isinstance(self.local_path, list) and isinstance(self.remote_path, str):
+                        for file in self.local_path:
+                            local_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, os.path.dirname(file), local_file, self.remote_path)
+                elif self.operation.lower() == SFTPOperation.GET:
+                    if isinstance(self.remote_path, str):
+                        files_list = self._search_files(sftp_client.listdir(self.remote_path))
+                        for file in files_list:
+                            remote_file = os.path.basename(file)
+                            file_msg = file
+                            self._transfer(sftp_client, self.local_path, remote_file, self.remote_path)
+                    if isinstance(self.remote_path, list) and isinstance(self.local_path, str):

Review comment:
       if there directory is not exist and create_intermediate_dirs is false, then there will be an error at the time of file transfer, is it worth overloading the method and doing explicit checks for everything?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AngryHelper commented on a change in pull request #19852: created SFTPBatchOperator which add batch function

Posted by GitBox <gi...@apache.org>.
AngryHelper commented on a change in pull request #19852:
URL: https://github.com/apache/airflow/pull/19852#discussion_r767298877



##########
File path: airflow/providers/sftp/operators/sftp_batch.py
##########
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP Batch operator."""
+import os
+import re
+from pathlib import Path
+from typing import Any, List, Union
+
+from paramiko.sftp_client import SFTPClient
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.sftp.operators.sftp import SFTPOperation, _check_conn, _make_intermediate_dirs
+
+
+class SFTPBatchOperator(BaseOperator):
+    """
+    SFTPOperator for transferring files from remote host to local or vice a versa.
+    This operator uses ssh_hook to open sftp transport channel that serve as basis
+    for file transfer.
+    :param ssh_hook: predefined ssh_hook to use for remote execution.
+        Either `ssh_hook` or `ssh_conn_id` needs to be provided.
+    :type ssh_hook: airflow.providers.ssh.hooks.ssh.SSHHook
+    :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>`
+        from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook`
+        is provided.
+    :type ssh_conn_id: str
+    :param remote_host: remote host to connect (templated)
+        Nullable. If provided, it will replace the `remote_host` which was
+        defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`.
+    :type remote_host: str
+    :param local_path: local folder path to get or put. (templated)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org