You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/17 15:25:53 UTC

[GitHub] [airflow] kanga333 opened a new pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

kanga333 opened a new pull request #9350:
URL: https://github.com/apache/airflow/pull/9350


   Adds an option that can be removed by specifying a prefix.
   
   This option is useful for deleting objects that are created under a specific path at once, such as EMR or Athena's partitions.
   
   I referred to the [GCSDeleteObjectsOperator implementation](https://github.com/apache/airflow/blob/54667d1eaa626358702d07051f9cb4b1754a1481/airflow/providers/google/cloud/operators/gcs.py#L229).
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442329144



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       > If the argument is keys=[] and prefix=None, then the following code will execute list_keys with prefix=None and raise an exception.
   
   This case can't occur, because you added the validation in the `__init__` which makes sure that at least one of them is specified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442242104



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       I now understand the need to consider cases where both values are included. Thanks. πŸ‘ 
   Fixed it at ebd65d2.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442568404



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,31 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not bool(keys) ^ bool(prefix):
+            raise ValueError("Either keys or prefix should be set.")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
+        if keys:

Review comment:
       `s3_hook.list_keys` may return None.
   If keys is none, it does nothing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442251729



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       ```
   >>> a = None
   >>> b = "test"
   >>> a or b
   'test'
   a = "not None"
   a or b
   'not None'
   ```
   That's what you want, isn't it? You don't need to check for `is not None`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ad-m commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
ad-m commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441724554



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
+

Review comment:
       Should we throw an exception here if mutually exclusive arguments are given?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#issuecomment-645444306


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://apache-airflow-slack.herokuapp.com/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle merged pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle merged pull request #9350:
URL: https://github.com/apache/airflow/pull/9350


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441753804



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       ```suggestion
           keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
   ```

##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       ```suggestion
           if bool(keys) != bool(prefix):
               raise ValueError("Either keys or prefix should be set.")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#issuecomment-658747746


   @feluelle Thanks for your review and support.
   Is there any action required for this PR to be merged?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442663134



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,31 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not bool(keys) ^ bool(prefix):

Review comment:
       Oh ya - correct. πŸ‘  We want to **not** have both be None or set at the same time to raise an error.

##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,31 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not bool(keys) ^ bool(prefix):
+            raise ValueError("Either keys or prefix should be set.")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
+        if keys:

Review comment:
       Good catch! πŸ‘ 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442242104



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       I now understand the need to consider cases where both values are included. Thanks. πŸ‘ 
   Fixed it at 96806fa.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441762244



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       ```suggestion
           if bool(keys) ^ bool(prefix):
               raise ValueError("Either keys or prefix should be set.")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441908403



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       I'm going to compare it to None as well as https://github.com/apache/airflow/pull/9350/files#r441908023.
   
   ```python
   if self.keys is not None:
       keys = self.keys
   else:
       ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#issuecomment-658751096


   Awesome work, congrats on your first merged pull request!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442284808



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       Passing an empty list to the S3DeleteObjectsOperator keys in the current implementation does not raise an exception. We had better not change this behavior. 
   
   ```python
   keys = []
   # Nothing is done. No exceptions will be made.
   S3DeleteObjectsOperator(task_id=task_id, bucket=bucket, keys=keys)
   ```
   
   If the argument is `keys=[]` and `prefix=None`, then the following code will execute `list_keys` with `prefix=None` and raise an exception.
   
   ```python
   keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441907977



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
+

Review comment:
       Thanks for your review.
   I thought I didn't need to do the runtime check because I was checking with __init__, is it necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442243095



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       It did not work well with the `or` operator when comparing with `is not None`.
   
   ```python
   >>> a = None
   >>> b = "test"
   >>> a is not None or b
   'test'
   >>> a = "not None"
   >>> a is not None or b
   True
   ```
   
   I fixed it as an if statement.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r442563626



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       I stopped considering an empty list and incorporated suggested changes. πŸ‘  59f2e9c




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441998178



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)

Review comment:
       The `or` operator does this. πŸ‘ 
   
   See: https://realpython.com/python-or-operator/#using-or-with-common-objects

##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       > bool(keys) ^ bool(prefix)
   
   ..is basically an `XOR` which means it will raise an error if either both values are None or both values are set.

##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")
+
         super().__init__(*args, **kwargs)
         self.bucket = bucket
         self.keys = keys
+        self.prefix = prefix
         self.aws_conn_id = aws_conn_id
         self.verify = verify
 
     def execute(self, context):
         s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        s3_hook.delete_objects(bucket=self.bucket, keys=self.keys)
+
+        if self.keys:
+            keys = self.keys
+        else:
+            keys = s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
+

Review comment:
       If we do it in the init we don't need to do it here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kanga333 commented on a change in pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
kanga333 commented on a change in pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#discussion_r441908023



##########
File path: airflow/providers/amazon/aws/operators/s3_delete_objects.py
##########
@@ -56,22 +59,34 @@ class S3DeleteObjectsOperator(BaseOperator):
     :type verify: bool or str
     """
 
-    template_fields = ('keys', 'bucket')
+    template_fields = ('keys', 'bucket', 'prefix')
 
     @apply_defaults
     def __init__(
             self,
             bucket,
-            keys,
+            keys=None,
+            prefix=None,
             aws_conn_id='aws_default',
             verify=None,
             *args, **kwargs):
+
+        if not keys and not prefix:
+            raise ValueError("Either keys or prefix should be set. Both are None")

Review comment:
       Thanks for your review.
   
   I missed the possibility of an empty array in the keys. When an empty array is passed to the keys, S3DeleteObjectsOperator does nothing without raising an exception. I'm going to compare the condition to None in order to not change this behavior.
   
   ```python
   if keys is None and prefix is None:
   ```
   
   I'm also going to add a simple test to confirm this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #9350: Add deletion by prefix to S3DeleteObjectsOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #9350:
URL: https://github.com/apache/airflow/pull/9350#issuecomment-658750469


   Thanks @kanga333 ! Merging this now. πŸ‘  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org