You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 20:32:55 UTC

[GitHub] [airflow] vincbeck opened a new pull request, #22737: Deprecate S3PrefixSensor

vincbeck opened a new pull request, #22737:
URL: https://github.com/apache/airflow/pull/22737

   Deprecate `S3PrefixSensor` and use `S3KeySensor` instead.
   
   ## Why deprecating `S3PrefixSensor`?
   `S3PrefixSensor` and `S3KeySensor` are doing pretty much the same thing so to avoid duplicates, we should deprecate one.
   Also `S3PrefixSensor` does not behave the way it is described by the documentation in comments and the way you expect by the name. `S3PrefixSensor` does not wait for a given prefix in S3 to exist but it waits for a given folder in S3 to exist (given the delimiter is `/`). Here are some examples I ran for testing.
   - `prefix="test"`. `true` when directory `test` exists
   - `prefix="test"`. `false` when directory `test` does not exist and file `test` exists
   - `prefix="test"`. `false` when directory `test` does not exist and file `test2` exists
   - `prefix="tes"`. `false` when directory `test` exists
   
   This misalignment between expected behavior and actual one is confusing for users. [Example of thread](https://stackoverflow.com/questions/71511535/s3prefixsensor-in-mwaa?noredirect=1#comment126395890_71511535) where a user does not understand why `S3PrefixSensor` behave this way.
   
   ## Why updating `S3KeySensor`?
   In order to be backward compatible we want to use `S3KeySensor` instead. `S3PrefixSensor` accept a list of files as input, so `S3KeySensor` should be too
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on PR #22737:
URL: https://github.com/apache/airflow/pull/22737#issuecomment-1095517182

   I only left tests related to deprecation warnings and default function in `TestS3KeySizeSensor`. I removed the other tests because these use cases are already covered in `TestS3KeySensor`
   I also moved `default_check_fn` in `S3KeySizeSensor`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844056252


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -166,13 +173,13 @@ def poke(self, context: 'Context'):
 
     def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List:
         """Gets a list of files in the bucket"""
-        prefix = self.bucket_key
+        prefix = self.bucket_key[0]

Review Comment:
   Sure. `S3KeySizeSensor` inherits from `S3KeySensor` so `self.bucket_key` is now a list of keys which contains only one key (`S3KeySizeSensor` check only one key)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r845007680


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -332,66 +339,32 @@ def poke(self, context: 'Context'):
         return self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, prefix=self.prefix)))
 
 
-class S3PrefixSensor(BaseSensorOperator):
+class S3PrefixSensor(S3KeySensor):
     """
-    Waits for a prefix or all prefixes to exist. A prefix is the first part of a key,
-    thus enabling checking of constructs similar to glob ``airfl*`` or
-    SQL LIKE ``'airfl%'``. There is the possibility to precise a delimiter to
-    indicate the hierarchy or keys, meaning that the match will stop at that
-    delimiter. Current code accepts sane delimiters, i.e. characters that
-    are NOT special characters in the Python regex engine.
-
-    :param bucket_name: Name of the S3 bucket
-    :param prefix: The prefix being waited on. Relative path from bucket root level.
-    :param delimiter: The delimiter intended to show hierarchy.
-        Defaults to '/'.
-    :param aws_conn_id: a reference to the s3 connection
-    :param verify: Whether or not to verify SSL certificates for S3 connection.
-        By default SSL certificates are verified.
-        You can provide the following values:
-
-        - ``False``: do not validate SSL certificates. SSL will still be used
-                 (unless use_ssl is False), but SSL certificates will not be
-                 verified.
-        - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses.
-                 You can specify this argument if you want to use a different
-                 CA cert bundle than the one used by botocore.
+    This class is deprecated.
+    Please use `airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.

Review Comment:
   ```suggestion
       Please use :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r843496142


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -40,15 +41,16 @@
 
 class S3KeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket.

Review Comment:
   can you also make this edit in `S3KeySizeSensor` ?



##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   What will happen if I check for 300 keys?
   If I get it right this will make 300 API calls per poke? If so I think it's not good.
   Also if I poke for `bucket_key=['a','b']` and 'a' is present but 'b' is not why should I keep poke for `a`?



##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -166,13 +173,13 @@ def poke(self, context: 'Context'):
 
     def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List:
         """Gets a list of files in the bucket"""
-        prefix = self.bucket_key
+        prefix = self.bucket_key[0]

Review Comment:
   can you explain this change?



##########
tests/providers/amazon/aws/sensors/test_s3_prefix.py:
##########
@@ -1,63 +0,0 @@
-#

Review Comment:
   I'm not sure if we can remove this test file just yet.
   This is because the `S3PrefixSensor` while deprecated still have logic that converts the user given parameter to what `S3KeySensor` expect thus we do want to make sure all of the tests in this file are working since we are doing it in a backward compatible way.
   
   I would leave this test file as is + add a new test to this file that make sure deprecation warning is raised when using `S3PrefixSensor`.
   This test file can be removed when we remove `S3PrefixSensor` in the next Major release of the provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844082410


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   > Also if I poke for bucket_key=['a','b'] and 'a' is present but 'b' is not why should I keep poke for a?
   
   Bouncing back on this question. Actually, having file 'a' present at time N, it does not mean file 'a' is still present at time N+1. So I dont think we can really optimize here. Example: we are waiting for file 'a' and 'b' in a same bucket.
   - File 'a' gets created
   - File 'a' gets removed
   - File 'b' gets created
   
   In this situation, the sensor should never stop waiting. If we go down your optimization path, the sensor will act as though the 2 files are present



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r847437905


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -109,39 +152,18 @@ def get_hook(self) -> S3Hook:
         return self.hook
 
 
-class S3KeySizeSensor(S3KeySensor):
-    """
-    Waits for a key (a file-like instance on S3) to be present and be more than
-    some size in a S3 bucket.
-    S3 being a key/value it does not support folders. The path is just a key
-    a resource.
+def default_check_fn(data: List) -> bool:

Review Comment:
   > Why is this function not in the S3KeySizeSensor class?
   
   Yeah, after thinking it about it, it makes more sense. Let me put it back
   
   > Can you please explain this change and how is it backward compatible?
   
   `S3KeySensor` waits a file to exist in a S3 bucket. `S3KeySizeSensor` waits a file to exist in a S3 bucket + add a custom check to check the size (which actually should not be limited to the size, hence the name is already wrong). But anyway `S3KeySizeSensor` does exactly what `S3KeySensor` does + it adds an additional custom check. Let's just add that custom check as optional parameter.
   
   > I see you created the howto_sensor_s3_key_function_definition example but reading the doc you just referenced it I think now that S3KeySensor has several different capabilities we should list them in the doc and explain about them. It wasn't so clear to me when reading it
   
   I actually thought that's exactly what I did :) I listed multiple use cases in the doc:
   - To check one file:
   - To check multiple files:
   - To check with an additional check
   
   Please tell me what is not clear or how I can improve it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844057508


##########
tests/providers/amazon/aws/sensors/test_s3_prefix.py:
##########
@@ -1,63 +0,0 @@
-#

Review Comment:
   Good point. I was really not sure if I should remove this test file. Let me put it back and add that test as you suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844190309


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   Thanks! make sense.
   Should we mention 1-2 sentences about this in the sensor docs?
   Just to notify that the sensor was not designed for high volume of keys (will work but with multiple API calls)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844188881


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -40,15 +41,16 @@
 
 class S3KeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket.

Review Comment:
   Oh you're right. Let me do that change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844051248


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   > What will happen if I check for 300 keys? If I get it right this will make 300 API calls per poke? If so I think it's not good.
   
   I agree but I also need to be backward compatible. That's the current behavior in `S3PrefixSensor` so I cannot really change this logic. I cannot either use some list APIs because the scope of these APIs are per bucket, and I can imagine providing 300 different buckets
   
   > Also if I poke for bucket_key=['a','b'] and 'a' is present but 'b' is not why should I keep poke for a?
   
   That's a good point. I can look into a way to optimize that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844194924


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   Totally! That's a good point. Let me add that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r846412408


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -40,15 +41,16 @@
 
 class S3KeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket.

Review Comment:
   Done



##########
tests/providers/amazon/aws/sensors/test_s3_prefix.py:
##########
@@ -1,63 +0,0 @@
-#

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844145464


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   @vincbeck valid point.
   As for the API calls... we preserve backward compatibility in terms of not breaking user code.
   If there is a way to optimize (convert 300 API calls to 1 call without user changing anything) then we can do it.
   I'm not sure if it's doable - this is what I wanted to find out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r846658209


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -109,39 +152,18 @@ def get_hook(self) -> S3Hook:
         return self.hook
 
 
-class S3KeySizeSensor(S3KeySensor):
-    """
-    Waits for a key (a file-like instance on S3) to be present and be more than
-    some size in a S3 bucket.
-    S3 being a key/value it does not support folders. The path is just a key
-    a resource.
+def default_check_fn(data: List) -> bool:

Review Comment:
   Why is this function not in the `S3KeySizeSensor` class?
   can you please explain this change and how is it backward compatible?
   I see you created the `howto_sensor_s3_key_function_definition` example but reading the doc you just referenced it I think now that `S3KeySensor` has many different capabilities we should list them in the doc and explain about them. It wasn't so clear to me when reading it



##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -109,39 +152,18 @@ def get_hook(self) -> S3Hook:
         return self.hook
 
 
-class S3KeySizeSensor(S3KeySensor):
-    """
-    Waits for a key (a file-like instance on S3) to be present and be more than
-    some size in a S3 bucket.
-    S3 being a key/value it does not support folders. The path is just a key
-    a resource.
+def default_check_fn(data: List) -> bool:

Review Comment:
   Why is this function not in the `S3KeySizeSensor` class?
   can you please explain this change and how is it backward compatible?
   I see you created the `howto_sensor_s3_key_function_definition` example but reading the doc you just referenced it I think now that `S3KeySensor` has several different capabilities we should list them in the doc and explain about them. It wasn't so clear to me when reading it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r846658209


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -109,39 +152,18 @@ def get_hook(self) -> S3Hook:
         return self.hook
 
 
-class S3KeySizeSensor(S3KeySensor):
-    """
-    Waits for a key (a file-like instance on S3) to be present and be more than
-    some size in a S3 bucket.
-    S3 being a key/value it does not support folders. The path is just a key
-    a resource.
+def default_check_fn(data: List) -> bool:

Review Comment:
   Why is this function not in the `S3KeySizeSensor` class?
   can you please explain this change and how is it backward compatible?
   I think we should add to the docs explanation about how to use `S3KeySensor` to sense for file sizes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on PR #22737:
URL: https://github.com/apache/airflow/pull/22737#issuecomment-1093263722

   While working on addressing feedbacks from this PR I realized that `S3KeySizeSensor` is also a subset of `S3KeySensor`. Thus I also decided to deprecate it and added an optional parameter to `S3KeySensor`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r846412698


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -166,13 +173,13 @@ def poke(self, context: 'Context'):
 
     def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List:
         """Gets a list of files in the bucket"""
-        prefix = self.bucket_key
+        prefix = self.bucket_key[0]

Review Comment:
   I actually decided to deprecate `S3KeySizeSensor`



##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844190309


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   Thanks! make sense.
   Should we mention 1-2 sentences about this in the sensor docs?
   Just to notify that the sensor was not designed for high volume of keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844058687


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -40,15 +41,16 @@
 
 class S3KeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket.

Review Comment:
   `S3KeySizeSensor` waits for only one key. Are you asking to update `S3KeySizeSensor` so it can wait for multiple files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal merged pull request #22737: Deprecate `S3PrefixSensor` and `S3KeySizeSensor` in favor of `S3KeySensor`

Posted by GitBox <gi...@apache.org>.
eladkal merged PR #22737:
URL: https://github.com/apache/airflow/pull/22737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844149741


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -40,15 +41,16 @@
 
 class S3KeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
+    Waits for one or multiple keys (a file-like instance on S3) to be present in a S3 bucket.

Review Comment:
   `S3KeySizeSensor` inherits from `S3KeySensor` so now that you are changing `bucket_key` from `str` to `Union[str, List[str]]` it means that user will now be able to pass list of keys to `S3KeySizeSensor`.
   
   If this is not right for the use case of `S3KeySizeSensor` then it require some additional changes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844186593


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   The only way to optimize it would be to use `ListObjects` instead but there are some caveats.
   1. `ListObjects` works for a given bucket. So If you want check 300 different keys, all of them in different bucket, it would make the problem worse
   2. It would make the code more complex because by using `ListObjects` you need to handle the different pages, making the necessary checks that the file is returned as part of `ListObjects`
   3. By looking really quick on how current users use `S3PrefixSensor` (https://github.com/search?q=S3PrefixSensor&type=code), most of it are single file usage
   
   Because of these points, in my opinion, it's not worth using `ListObjects` instead of `HeadObject`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844190309


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   Thanks! make sense.
   Should we mention 1-2 sentences about this in the operator docs?
   Just to notify that the sensor was not designed for high volume of keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22737: Deprecate `S3PrefixSensor` and `S3KeySizeSensor` in favor of `S3KeySensor`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22737:
URL: https://github.com/apache/airflow/pull/22737#issuecomment-1096071088

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #22737: Deprecate S3PrefixSensor

Posted by GitBox <gi...@apache.org>.
vincbeck commented on PR #22737:
URL: https://github.com/apache/airflow/pull/22737#issuecomment-1095518033

   I also edited the PR description to add a section to explain the purpose of `default_check_fn` since it might not be very obvious


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org