You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/31 10:27:21 UTC
[GitHub] [airflow] JavierLopezT opened a new pull request #17937: FTPToS3 transfer several files
JavierLopezT opened a new pull request #17937:
URL: https://github.com/apache/airflow/pull/17937
With these changes you can transfer several files with just one operator
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mrbaguvix commented on a change in pull request #17937: FTPToS3 transfer several files
Posted by GitBox <gi...@apache.org>.
mrbaguvix commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r702523532
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
class FTPToS3Operator(BaseOperator):
"""
- This operator enables the transferring of files from FTP server to S3.
+ This operator enables the transfer of files from FTP server to S3. It can be used for
+ transfer one or multiple files.
- :param s3_bucket: The targeted s3 bucket in which upload the file to
+ :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+ For multiple files, it is the route where the files will be found.
+ :type ftp_path: str
+ :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.
:type s3_bucket: str
- :param s3_key: The targeted s3 key. This is the specified file path for
- uploading the file to S3.
+ :param s3_key: The targeted s3 key. For one file it must include the file path. For several,
+ it must end with "/".
:type s3_key: str
- :param ftp_path: The ftp remote path, including the file.
- :type ftp_path: str
+ :param ftp_filenames: Only used if you want to move multiple files. You can pass a list
+ with exact filenames present in the ftp path, or a prefix that all files must meet.
+ :type ftp_filenames: Union(str, list)
+ :param s3_filenames: Only used if you want to move multiple files and name them different than
Review comment:
```suggestion
:param s3_filenames: Only used if you want to move multiple files and name them different from
```
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
class FTPToS3Operator(BaseOperator):
"""
- This operator enables the transferring of files from FTP server to S3.
+ This operator enables the transfer of files from FTP server to S3. It can be used for
Review comment:
```suggestion
This operator enables the transfer of files from a FTP server to S3. It can be used to
```
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
class FTPToS3Operator(BaseOperator):
"""
- This operator enables the transferring of files from FTP server to S3.
+ This operator enables the transfer of files from FTP server to S3. It can be used for
+ transfer one or multiple files.
- :param s3_bucket: The targeted s3 bucket in which upload the file to
+ :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+ For multiple files, it is the route where the files will be found.
+ :type ftp_path: str
+ :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.
Review comment:
```suggestion
:param s3_bucket: The targeted s3 bucket in which to upload the file(s).
```
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
class FTPToS3Operator(BaseOperator):
"""
- This operator enables the transferring of files from FTP server to S3.
+ This operator enables the transfer of files from FTP server to S3. It can be used for
+ transfer one or multiple files.
- :param s3_bucket: The targeted s3 bucket in which upload the file to
+ :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+ For multiple files, it is the route where the files will be found.
+ :type ftp_path: str
+ :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.
:type s3_bucket: str
- :param s3_key: The targeted s3 key. This is the specified file path for
- uploading the file to S3.
+ :param s3_key: The targeted s3 key. For one file it must include the file path. For several,
+ it must end with "/".
:type s3_key: str
- :param ftp_path: The ftp remote path, including the file.
- :type ftp_path: str
+ :param ftp_filenames: Only used if you want to move multiple files. You can pass a list
+ with exact filenames present in the ftp path, or a prefix that all files must meet.
+ :type ftp_filenames: Union(str, list)
+ :param s3_filenames: Only used if you want to move multiple files and name them different than
+ the originals from the ftp. It can be a list of filenames or a files prefix (that will replace
Review comment:
```suggestion
the originals from the ftp. It can be a list of filenames or file prefix (that will replace
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] JavierLopezT commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723149385
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
:type acl_policy: str
"""
- template_fields = (
- 's3_bucket',
- 's3_key',
- 'ftp_path',
- )
+ template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
def __init__(
self,
- s3_bucket,
- s3_key,
- ftp_path,
- ftp_conn_id='ftp_default',
- aws_conn_id='aws_default',
- replace=False,
- encrypt=False,
- gzip=False,
- acl_policy=None,
- *args,
+ *,
+ ftp_path: str,
+ s3_bucket: str,
+ s3_key: str,
+ ftp_filenames: Optional[Union[str, List[str]]] = None,
+ s3_filenames: Optional[Union[str, List[str]]] = None,
+ ftp_conn_id: str = 'ftp_default',
+ aws_conn_id: str = 'aws_default',
+ replace: bool = False,
+ encrypt: bool = False,
+ gzip: bool = False,
+ acl_policy: str = None,
**kwargs,
):
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
+ self.ftp_path = ftp_path
self.s3_bucket = s3_bucket
self.s3_key = s3_key
- self.ftp_path = ftp_path
+ self.ftp_filenames = ftp_filenames
+ self.s3_filenames = s3_filenames
self.aws_conn_id = aws_conn_id
self.ftp_conn_id = ftp_conn_id
self.replace = replace
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
- def execute(self, context):
- s3_hook = S3Hook(self.aws_conn_id)
- ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+ self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
Review comment:
Sure, I'll modify it. Why is that, though?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk merged pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17937:
URL: https://github.com/apache/airflow/pull/17937
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723132242
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
:type acl_policy: str
"""
- template_fields = (
- 's3_bucket',
- 's3_key',
- 'ftp_path',
- )
+ template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
def __init__(
self,
- s3_bucket,
- s3_key,
- ftp_path,
- ftp_conn_id='ftp_default',
- aws_conn_id='aws_default',
- replace=False,
- encrypt=False,
- gzip=False,
- acl_policy=None,
- *args,
+ *,
+ ftp_path: str,
+ s3_bucket: str,
+ s3_key: str,
+ ftp_filenames: Optional[Union[str, List[str]]] = None,
+ s3_filenames: Optional[Union[str, List[str]]] = None,
+ ftp_conn_id: str = 'ftp_default',
+ aws_conn_id: str = 'aws_default',
+ replace: bool = False,
+ encrypt: bool = False,
+ gzip: bool = False,
+ acl_policy: str = None,
**kwargs,
):
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
+ self.ftp_path = ftp_path
self.s3_bucket = s3_bucket
self.s3_key = s3_key
- self.ftp_path = ftp_path
+ self.ftp_filenames = ftp_filenames
+ self.s3_filenames = s3_filenames
self.aws_conn_id = aws_conn_id
self.ftp_conn_id = ftp_conn_id
self.replace = replace
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
- def execute(self, context):
- s3_hook = S3Hook(self.aws_conn_id)
- ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+ self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+
+ def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
+ s3 = S3Hook(self.aws_conn_id)
with NamedTemporaryFile() as local_tmp_file:
- ftp_hook.retrieve_file(
- remote_full_path=self.ftp_path, local_full_path_or_buffer=local_tmp_file.name
+ self.ftp_hook.retrieve_file(
+ remote_full_path=remote_filename, local_full_path_or_buffer=local_tmp_file.name
)
- s3_hook.load_file(
+ s3.load_file(
filename=local_tmp_file.name,
- key=self.s3_key,
+ key=s3_file_key,
bucket_name=self.s3_bucket,
replace=self.replace,
encrypt=self.encrypt,
gzip=self.gzip,
acl_policy=self.acl_policy,
)
+ self.log.info(f'File upload to {s3_file_key}')
+
+ def execute(self, context):
+ if self.ftp_filenames:
+ if isinstance(self.ftp_filenames, str):
+ self.log.info(f'Getting files in {self.ftp_path}')
+
+ list_dir = self.ftp_hook.list_directory(
+ path=self.ftp_path,
+ )
+
+ if self.ftp_filenames == 'all':
Review comment:
```suggestion
if self.ftp_filenames == '*':
```
Using a string of `all` here feels odd/error prone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk merged pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17937:
URL: https://github.com/apache/airflow/pull/17937
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] JavierLopezT commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723149385
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
:type acl_policy: str
"""
- template_fields = (
- 's3_bucket',
- 's3_key',
- 'ftp_path',
- )
+ template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
def __init__(
self,
- s3_bucket,
- s3_key,
- ftp_path,
- ftp_conn_id='ftp_default',
- aws_conn_id='aws_default',
- replace=False,
- encrypt=False,
- gzip=False,
- acl_policy=None,
- *args,
+ *,
+ ftp_path: str,
+ s3_bucket: str,
+ s3_key: str,
+ ftp_filenames: Optional[Union[str, List[str]]] = None,
+ s3_filenames: Optional[Union[str, List[str]]] = None,
+ ftp_conn_id: str = 'ftp_default',
+ aws_conn_id: str = 'aws_default',
+ replace: bool = False,
+ encrypt: bool = False,
+ gzip: bool = False,
+ acl_policy: str = None,
**kwargs,
):
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
+ self.ftp_path = ftp_path
self.s3_bucket = s3_bucket
self.s3_key = s3_key
- self.ftp_path = ftp_path
+ self.ftp_filenames = ftp_filenames
+ self.s3_filenames = s3_filenames
self.aws_conn_id = aws_conn_id
self.ftp_conn_id = ftp_conn_id
self.replace = replace
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
- def execute(self, context):
- s3_hook = S3Hook(self.aws_conn_id)
- ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+ self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
Review comment:
Sure, I'll modify it. Why is that, though?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723131359
##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
:type acl_policy: str
"""
- template_fields = (
- 's3_bucket',
- 's3_key',
- 'ftp_path',
- )
+ template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
def __init__(
self,
- s3_bucket,
- s3_key,
- ftp_path,
- ftp_conn_id='ftp_default',
- aws_conn_id='aws_default',
- replace=False,
- encrypt=False,
- gzip=False,
- acl_policy=None,
- *args,
+ *,
+ ftp_path: str,
+ s3_bucket: str,
+ s3_key: str,
+ ftp_filenames: Optional[Union[str, List[str]]] = None,
+ s3_filenames: Optional[Union[str, List[str]]] = None,
+ ftp_conn_id: str = 'ftp_default',
+ aws_conn_id: str = 'aws_default',
+ replace: bool = False,
+ encrypt: bool = False,
+ gzip: bool = False,
+ acl_policy: str = None,
**kwargs,
):
- super().__init__(*args, **kwargs)
+ super().__init__(**kwargs)
+ self.ftp_path = ftp_path
self.s3_bucket = s3_bucket
self.s3_key = s3_key
- self.ftp_path = ftp_path
+ self.ftp_filenames = ftp_filenames
+ self.s3_filenames = s3_filenames
self.aws_conn_id = aws_conn_id
self.ftp_conn_id = ftp_conn_id
self.replace = replace
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
- def execute(self, context):
- s3_hook = S3Hook(self.aws_conn_id)
- ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+ self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
Review comment:
Do not create hooks in the constructor of operators please.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org