You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/07 20:54:56 UTC
[airflow] branch main updated: Add blocksize arg for ftp hook (#24860)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 64412ee867 Add blocksize arg for ftp hook (#24860)
64412ee867 is described below
commit 64412ee867fe0918cc3b616b3fb0b72dcd88125c
Author: Kevin George <ke...@hotmail.com>
AuthorDate: Thu Jul 7 16:54:46 2022 -0400
Add blocksize arg for ftp hook (#24860)
Co-authored-by: Kevin George <“kevingeorge232@gmail.com”>
---
airflow/providers/ftp/hooks/ftp.py | 29 ++++++++++++++++++-----------
tests/providers/ftp/hooks/test_ftp.py | 4 ++--
2 files changed, 20 insertions(+), 13 deletions(-)
diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py
index 5e6e5c12c2..08a58b2526 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -20,7 +20,7 @@
import datetime
import ftplib
import os.path
-from typing import Any, List, Optional, Tuple
+from typing import Any, Callable, List, Optional, Tuple
from airflow.hooks.base import BaseHook
@@ -115,7 +115,13 @@ class FTPHook(BaseHook):
conn = self.get_conn()
conn.rmd(path)
- def retrieve_file(self, remote_full_path, local_full_path_or_buffer, callback=None):
+ def retrieve_file(
+ self,
+ remote_full_path: str,
+ local_full_path_or_buffer: Any,
+ callback: Optional[Callable] = None,
+ block_size: int = 8192,
+ ) -> None:
"""
Transfers the remote file to a local location.
@@ -132,6 +138,8 @@ class FTPHook(BaseHook):
that writing to a file or buffer will need to be handled inside the
callback.
[default: output_handle.write()]
+ :param block_size: file is transferred in chunks of default size 8192
+ or as set by user
.. code-block:: python
@@ -164,31 +172,30 @@ class FTPHook(BaseHook):
"""
conn = self.get_conn()
-
is_path = isinstance(local_full_path_or_buffer, str)
# without a callback, default to writing to a user-provided file or
# file-like buffer
if not callback:
if is_path:
-
output_handle = open(local_full_path_or_buffer, 'wb')
else:
output_handle = local_full_path_or_buffer
+
callback = output_handle.write
- else:
- output_handle = None
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
self.log.info('Retrieving file from FTP: %s', remote_full_path)
- conn.retrbinary(f'RETR {remote_file_name}', callback)
+ conn.retrbinary(f'RETR {remote_file_name}', callback, block_size)
self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
if is_path and output_handle:
output_handle.close()
- def store_file(self, remote_full_path: str, local_full_path_or_buffer: Any) -> None:
+ def store_file(
+ self, remote_full_path: str, local_full_path_or_buffer: Any, block_size: int = 8192
+ ) -> None:
"""
Transfers a local file to the remote location.
@@ -199,19 +206,19 @@ class FTPHook(BaseHook):
:param remote_full_path: full path to the remote file
:param local_full_path_or_buffer: full path to the local file or a
file-like buffer
+ :param block_size: file is transferred in chunks of default size 8192
+ or as set by user
"""
conn = self.get_conn()
-
is_path = isinstance(local_full_path_or_buffer, str)
if is_path:
-
input_handle = open(local_full_path_or_buffer, 'rb')
else:
input_handle = local_full_path_or_buffer
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
- conn.storbinary(f'STOR {remote_file_name}', input_handle)
+ conn.storbinary(f'STOR {remote_file_name}', input_handle, block_size)
if is_path:
input_handle.close()
diff --git a/tests/providers/ftp/hooks/test_ftp.py b/tests/providers/ftp/hooks/test_ftp.py
index 943cc1da6d..635d45cd63 100644
--- a/tests/providers/ftp/hooks/test_ftp.py
+++ b/tests/providers/ftp/hooks/test_ftp.py
@@ -111,14 +111,14 @@ class TestFTPHook(unittest.TestCase):
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer)
- self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write)
+ self.conn_mock.retrbinary.assert_called_once_with('RETR path', _buffer.write, 8192)
def test_retrieve_file_with_callback(self):
func = mock.Mock()
_buffer = io.StringIO('buffer')
with fh.FTPHook() as ftp_hook:
ftp_hook.retrieve_file(self.path, _buffer, callback=func)
- self.conn_mock.retrbinary.assert_called_once_with('RETR path', func)
+ self.conn_mock.retrbinary.assert_called_once_with('RETR path', func, 8192)
def test_connection_success(self):
with fh.FTPHook() as ftp_hook: