You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 14:55:34 UTC

[GitHub] [airflow] vincbeck commented on a diff in pull request #22737: Deprecate S3PrefixSensor

vincbeck commented on code in PR #22737:
URL: https://github.com/apache/airflow/pull/22737#discussion_r844051248


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -78,27 +80,32 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.bucket_name = bucket_name
-        self.bucket_key = bucket_key
+        self.bucket_key = [bucket_key] if isinstance(bucket_key, str) else bucket_key
         self.wildcard_match = wildcard_match
         self.aws_conn_id = aws_conn_id
         self.verify = verify
         self.hook: Optional[S3Hook] = None
 
-    def _resolve_bucket_and_key(self):
+    def _resolve_bucket_and_key(self, key):
         """If key is URI, parse bucket"""
         if self.bucket_name is None:
-            self.bucket_name, self.bucket_key = S3Hook.parse_s3_url(self.bucket_key)
+            return S3Hook.parse_s3_url(key)
         else:
-            parsed_url = urlparse(self.bucket_key)
+            parsed_url = urlparse(key)
             if parsed_url.scheme != '' or parsed_url.netloc != '':
                 raise AirflowException('If bucket_name provided, bucket_key must be relative path, not URI.')
+            return self.bucket_name, key
 
-    def poke(self, context: 'Context'):
-        self._resolve_bucket_and_key()
-        self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key)
+    def _key_exists(self, key):
+        bucket_name, key = self._resolve_bucket_and_key(key)
+        self.log.info('Poking for key : s3://%s/%s', bucket_name, key)
         if self.wildcard_match:
-            return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name)
-        return self.get_hook().check_for_key(self.bucket_key, self.bucket_name)
+            return self.get_hook().check_for_wildcard_key(key, bucket_name)
+
+        return self.get_hook().check_for_key(key, bucket_name)
+
+    def poke(self, context: 'Context'):
+        return all(self._key_exists(key) for key in self.bucket_key)

Review Comment:
   > What will happen if I check for 300 keys? If I get it right this will make 300 API calls per poke? If so I think it's not good.
   
   I agree but I also need to be backward compatible. That's the current behavior in `S3PrefixSensor` so I cannot really change this logic. I cannot either use some list APIs because the scope of these APIs are per bucket, and I can imagine providing 300 different buckets
   
   > Also if I poke for bucket_key=['a','b'] and 'a' is present but 'b' is not why should I keep poke for a?
   
   That's a good point. I can look into a way to optimize that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org