You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/31 10:27:21 UTC

[GitHub] [airflow] JavierLopezT opened a new pull request #17937: FTPToS3 transfer several files

JavierLopezT opened a new pull request #17937:
URL: https://github.com/apache/airflow/pull/17937


   With these changes you can transfer several files with just one operator
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mrbaguvix commented on a change in pull request #17937: FTPToS3 transfer several files

Posted by GitBox <gi...@apache.org>.
mrbaguvix commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r702523532



##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
 
 class FTPToS3Operator(BaseOperator):
     """
-    This operator enables the transferring of files from FTP server to S3.
+    This operator enables the transfer of files from FTP server to S3. It can be used for
+    transfer one or multiple files.
 
-    :param s3_bucket: The targeted s3 bucket in which upload the file to
+    :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+        For multiple files, it is the route where the files will be found.
+    :type ftp_path: str
+    :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.
     :type s3_bucket: str
-    :param s3_key: The targeted s3 key. This is the specified file path for
-        uploading the file to S3.
+    :param s3_key: The targeted s3 key. For one file it must include the file path. For several,
+        it must end with "/".
     :type s3_key: str
-    :param ftp_path: The ftp remote path, including the file.
-    :type ftp_path: str
+    :param ftp_filenames: Only used if you want to move multiple files. You can pass a list
+        with exact filenames present in the ftp path, or a prefix that all files must meet.
+    :type ftp_filenames: Union(str, list)
+    :param s3_filenames: Only used if you want to move multiple files and name them different than

Review comment:
       ```suggestion
       :param s3_filenames: Only used if you want to move multiple files and name them different from
   ```
   

##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
 
 class FTPToS3Operator(BaseOperator):
     """
-    This operator enables the transferring of files from FTP server to S3.
+    This operator enables the transfer of files from FTP server to S3. It can be used for

Review comment:
       ```suggestion
       This operator enables the transfer of files from a FTP server to S3. It can be used to
   ```
   

##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
 
 class FTPToS3Operator(BaseOperator):
     """
-    This operator enables the transferring of files from FTP server to S3.
+    This operator enables the transfer of files from FTP server to S3. It can be used for
+    transfer one or multiple files.
 
-    :param s3_bucket: The targeted s3 bucket in which upload the file to
+    :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+        For multiple files, it is the route where the files will be found.
+    :type ftp_path: str
+    :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.

Review comment:
       ```suggestion
       :param s3_bucket: The targeted s3 bucket in which to upload the file(s).
   ```
   

##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -24,20 +25,29 @@
 
 class FTPToS3Operator(BaseOperator):
     """
-    This operator enables the transferring of files from FTP server to S3.
+    This operator enables the transfer of files from FTP server to S3. It can be used for
+    transfer one or multiple files.
 
-    :param s3_bucket: The targeted s3 bucket in which upload the file to
+    :param ftp_path: The ftp remote path. For one file it is mandatory to include the file as well.
+        For multiple files, it is the route where the files will be found.
+    :type ftp_path: str
+    :param s3_bucket: The targeted s3 bucket in which upload the file(s) to.
     :type s3_bucket: str
-    :param s3_key: The targeted s3 key. This is the specified file path for
-        uploading the file to S3.
+    :param s3_key: The targeted s3 key. For one file it must include the file path. For several,
+        it must end with "/".
     :type s3_key: str
-    :param ftp_path: The ftp remote path, including the file.
-    :type ftp_path: str
+    :param ftp_filenames: Only used if you want to move multiple files. You can pass a list
+        with exact filenames present in the ftp path, or a prefix that all files must meet.
+    :type ftp_filenames: Union(str, list)
+    :param s3_filenames: Only used if you want to move multiple files and name them different than
+        the originals from the ftp. It can be a list of filenames or a files prefix (that will replace

Review comment:
       ```suggestion
           the originals from the ftp. It can be a list of filenames or file prefix (that will replace
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723149385



##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
     :type acl_policy: str
     """
 
-    template_fields = (
-        's3_bucket',
-        's3_key',
-        'ftp_path',
-    )
+    template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
 
     def __init__(
         self,
-        s3_bucket,
-        s3_key,
-        ftp_path,
-        ftp_conn_id='ftp_default',
-        aws_conn_id='aws_default',
-        replace=False,
-        encrypt=False,
-        gzip=False,
-        acl_policy=None,
-        *args,
+        *,
+        ftp_path: str,
+        s3_bucket: str,
+        s3_key: str,
+        ftp_filenames: Optional[Union[str, List[str]]] = None,
+        s3_filenames: Optional[Union[str, List[str]]] = None,
+        ftp_conn_id: str = 'ftp_default',
+        aws_conn_id: str = 'aws_default',
+        replace: bool = False,
+        encrypt: bool = False,
+        gzip: bool = False,
+        acl_policy: str = None,
         **kwargs,
     ):
-        super().__init__(*args, **kwargs)
+        super().__init__(**kwargs)
+        self.ftp_path = ftp_path
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
-        self.ftp_path = ftp_path
+        self.ftp_filenames = ftp_filenames
+        self.s3_filenames = s3_filenames
         self.aws_conn_id = aws_conn_id
         self.ftp_conn_id = ftp_conn_id
         self.replace = replace
         self.encrypt = encrypt
         self.gzip = gzip
         self.acl_policy = acl_policy
 
-    def execute(self, context):
-        s3_hook = S3Hook(self.aws_conn_id)
-        ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+        self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)

Review comment:
       Sure, I'll modify it. Why is that, though? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17937:
URL: https://github.com/apache/airflow/pull/17937


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723132242



##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
     :type acl_policy: str
     """
 
-    template_fields = (
-        's3_bucket',
-        's3_key',
-        'ftp_path',
-    )
+    template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
 
     def __init__(
         self,
-        s3_bucket,
-        s3_key,
-        ftp_path,
-        ftp_conn_id='ftp_default',
-        aws_conn_id='aws_default',
-        replace=False,
-        encrypt=False,
-        gzip=False,
-        acl_policy=None,
-        *args,
+        *,
+        ftp_path: str,
+        s3_bucket: str,
+        s3_key: str,
+        ftp_filenames: Optional[Union[str, List[str]]] = None,
+        s3_filenames: Optional[Union[str, List[str]]] = None,
+        ftp_conn_id: str = 'ftp_default',
+        aws_conn_id: str = 'aws_default',
+        replace: bool = False,
+        encrypt: bool = False,
+        gzip: bool = False,
+        acl_policy: str = None,
         **kwargs,
     ):
-        super().__init__(*args, **kwargs)
+        super().__init__(**kwargs)
+        self.ftp_path = ftp_path
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
-        self.ftp_path = ftp_path
+        self.ftp_filenames = ftp_filenames
+        self.s3_filenames = s3_filenames
         self.aws_conn_id = aws_conn_id
         self.ftp_conn_id = ftp_conn_id
         self.replace = replace
         self.encrypt = encrypt
         self.gzip = gzip
         self.acl_policy = acl_policy
 
-    def execute(self, context):
-        s3_hook = S3Hook(self.aws_conn_id)
-        ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+        self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+
+    def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
+        s3 = S3Hook(self.aws_conn_id)
 
         with NamedTemporaryFile() as local_tmp_file:
-            ftp_hook.retrieve_file(
-                remote_full_path=self.ftp_path, local_full_path_or_buffer=local_tmp_file.name
+            self.ftp_hook.retrieve_file(
+                remote_full_path=remote_filename, local_full_path_or_buffer=local_tmp_file.name
             )
 
-            s3_hook.load_file(
+            s3.load_file(
                 filename=local_tmp_file.name,
-                key=self.s3_key,
+                key=s3_file_key,
                 bucket_name=self.s3_bucket,
                 replace=self.replace,
                 encrypt=self.encrypt,
                 gzip=self.gzip,
                 acl_policy=self.acl_policy,
             )
+            self.log.info(f'File upload to {s3_file_key}')
+
+    def execute(self, context):
+        if self.ftp_filenames:
+            if isinstance(self.ftp_filenames, str):
+                self.log.info(f'Getting files in {self.ftp_path}')
+
+                list_dir = self.ftp_hook.list_directory(
+                    path=self.ftp_path,
+                )
+
+                if self.ftp_filenames == 'all':

Review comment:
       ```suggestion
                   if self.ftp_filenames == '*':
   ```
   
   Using a string of `all` here feels odd/error prone.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #17937:
URL: https://github.com/apache/airflow/pull/17937


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JavierLopezT commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
JavierLopezT commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723149385



##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
     :type acl_policy: str
     """
 
-    template_fields = (
-        's3_bucket',
-        's3_key',
-        'ftp_path',
-    )
+    template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
 
     def __init__(
         self,
-        s3_bucket,
-        s3_key,
-        ftp_path,
-        ftp_conn_id='ftp_default',
-        aws_conn_id='aws_default',
-        replace=False,
-        encrypt=False,
-        gzip=False,
-        acl_policy=None,
-        *args,
+        *,
+        ftp_path: str,
+        s3_bucket: str,
+        s3_key: str,
+        ftp_filenames: Optional[Union[str, List[str]]] = None,
+        s3_filenames: Optional[Union[str, List[str]]] = None,
+        ftp_conn_id: str = 'ftp_default',
+        aws_conn_id: str = 'aws_default',
+        replace: bool = False,
+        encrypt: bool = False,
+        gzip: bool = False,
+        acl_policy: str = None,
         **kwargs,
     ):
-        super().__init__(*args, **kwargs)
+        super().__init__(**kwargs)
+        self.ftp_path = ftp_path
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
-        self.ftp_path = ftp_path
+        self.ftp_filenames = ftp_filenames
+        self.s3_filenames = s3_filenames
         self.aws_conn_id = aws_conn_id
         self.ftp_conn_id = ftp_conn_id
         self.replace = replace
         self.encrypt = encrypt
         self.gzip = gzip
         self.acl_policy = acl_policy
 
-    def execute(self, context):
-        s3_hook = S3Hook(self.aws_conn_id)
-        ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+        self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)

Review comment:
       Sure, I'll modify it. Why is that, though? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17937: Enable FTPToS3Operator to transfer several files

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17937:
URL: https://github.com/apache/airflow/pull/17937#discussion_r723131359



##########
File path: airflow/providers/amazon/aws/transfers/ftp_to_s3.py
##########
@@ -53,52 +64,89 @@ class FTPToS3Operator(BaseOperator):
     :type acl_policy: str
     """
 
-    template_fields = (
-        's3_bucket',
-        's3_key',
-        'ftp_path',
-    )
+    template_fields = ('ftp_path', 's3_bucket', 's3_key', 'ftp_filenames', 's3_filenames')
 
     def __init__(
         self,
-        s3_bucket,
-        s3_key,
-        ftp_path,
-        ftp_conn_id='ftp_default',
-        aws_conn_id='aws_default',
-        replace=False,
-        encrypt=False,
-        gzip=False,
-        acl_policy=None,
-        *args,
+        *,
+        ftp_path: str,
+        s3_bucket: str,
+        s3_key: str,
+        ftp_filenames: Optional[Union[str, List[str]]] = None,
+        s3_filenames: Optional[Union[str, List[str]]] = None,
+        ftp_conn_id: str = 'ftp_default',
+        aws_conn_id: str = 'aws_default',
+        replace: bool = False,
+        encrypt: bool = False,
+        gzip: bool = False,
+        acl_policy: str = None,
         **kwargs,
     ):
-        super().__init__(*args, **kwargs)
+        super().__init__(**kwargs)
+        self.ftp_path = ftp_path
         self.s3_bucket = s3_bucket
         self.s3_key = s3_key
-        self.ftp_path = ftp_path
+        self.ftp_filenames = ftp_filenames
+        self.s3_filenames = s3_filenames
         self.aws_conn_id = aws_conn_id
         self.ftp_conn_id = ftp_conn_id
         self.replace = replace
         self.encrypt = encrypt
         self.gzip = gzip
         self.acl_policy = acl_policy
 
-    def execute(self, context):
-        s3_hook = S3Hook(self.aws_conn_id)
-        ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
+        self.ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)

Review comment:
       Do not create hooks in the constructor of operators please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org