You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/07 20:54:56 UTC

[airflow] branch main updated: Add blocksize arg for ftp hook (#24860)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 64412ee867 Add blocksize arg for ftp hook (#24860)
64412ee867 is described below

commit 64412ee867fe0918cc3b616b3fb0b72dcd88125c
Author: Kevin George <ke...@hotmail.com>
AuthorDate: Thu Jul 7 16:54:46 2022 -0400

    Add blocksize arg for ftp hook (#24860)
    
    Co-authored-by: Kevin George <“kevingeorge232@gmail.com”>
---
 airflow/providers/ftp/hooks/ftp.py    | 29 ++++++++++++++++++-----------
 tests/providers/ftp/hooks/test_ftp.py |  4 ++--
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py
index 5e6e5c12c2..08a58b2526 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -20,7 +20,7 @@
 import datetime
 import ftplib
 import os.path
-from typing import Any, List, Optional, Tuple
+from typing import Any, Callable, List, Optional, Tuple
 
 from airflow.hooks.base import BaseHook
 
@@ -115,7 +115,13 @@ class FTPHook(BaseHook):
         conn = self.get_conn()
         conn.rmd(path)
 
-    def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=None):
+    def retrieve_file(
+        self,
+        remote_full_path: str,
+        local_full_path_or_buffer: Any,
+        callback: Optional[Callable] = None,
+        block_size: int = 8192,
+    ) -> None:
         """
         Transfers the remote file to a local location.
 
@@ -132,6 +138,8 @@ class FTPHook(BaseHook):
             that writing to a file or buffer will need to be handled inside the
             callback.
             [default: output_handle.write()]
+        :param block_size: file is transferred in chunks of default size 8192
+            or as set by user
 
         .. code-block:: python
 
@@ -164,31 +172,30 @@ class FTPHook(BaseHook):
 
         """
         conn = self.get_conn()
-
         is_path = isinstance(local_full_path_or_buffer, str)
 
         # without a callback, default to writing to a user-provided file or
         # file-like buffer
         if not callback:
             if is_path:
-
                 output_handle = open(local_full_path_or_buffer, 'wb')
             else:
                 output_handle = local_full_path_or_buffer
+
             callback = output_handle.write
-        else:
-            output_handle = None
 
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
         self.log.info('Retrieving file from FTP: %s', remote_full_path)
-        conn.retrbinary(f'RETR {remote_file_name}', callback)
+        conn.retrbinary(f'RETR {remote_file_name}', callback, block_size)
         self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
 
         if is_path and output_handle:
             output_handle.close()
 
-    def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> None:
+    def store_file(
+        self, remote_full_path: str, local_full_path_or_buffer: Any, block_size: int = 8192
+    ) -> None:
         """
         Transfers a local file to the remote location.
 
@@ -199,19 +206,19 @@ class FTPHook(BaseHook):
         :param remote_full_path: full path to the remote file
         :param local_full_path_or_buffer: full path to the local file or a
             file-like buffer
+        :param block_size: file is transferred in chunks of default size 8192
+            or as set by user
         """
         conn = self.get_conn()
-
         is_path = isinstance(local_full_path_or_buffer, str)
 
         if is_path:
-
             input_handle = open(local_full_path_or_buffer, 'rb')
         else:
             input_handle = local_full_path_or_buffer
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
-        conn.storbinary(f'STOR {remote_file_name}', input_handle)
+        conn.storbinary(f'STOR {remote_file_name}', input_handle, block_size)
 
         if is_path:
             input_handle.close()
diff --git a/tests/providers/ftp/hooks/test_ftp.py b/tests/providers/ftp/hooks/test_ftp.py
index 943cc1da6d..635d45cd63 100644
--- a/tests/providers/ftp/hooks/test_ftp.py
+++ b/tests/providers/ftp/hooks/test_ftp.py
@@ -111,14 +111,14 @@ class TestFTPHook(unittest.TestCase):
         _buffer = io.StringIO('buffer')
         with fh.FTPHook() as ftp_hook:
             ftp_hook.retrieve_file(self.path, _buffer)
-        self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write)
+        self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write, 8192)
 
     def test_retrieve_file_with_callback(self):
         func = mock.Mock()
         _buffer = io.StringIO('buffer')
         with fh.FTPHook() as ftp_hook:
             ftp_hook.retrieve_file(self.path, _buffer, callback=func)
-        self.conn_mock.retrbinary.assert_called_once_with('RETR path', func)
+        self.conn_mock.retrbinary.assert_called_once_with('RETR path', func, 8192)
 
     def test_connection_success(self):
         with fh.FTPHook() as ftp_hook: