You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Lee-W (via GitHub)" <gi...@apache.org> on 2023/06/26 10:01:20 UTC

[GitHub] [airflow] Lee-W commented on a diff in pull request #31940: Add deferrable mode to S3KeysUnchangedSensor

Lee-W commented on code in PR #31940:
URL: https://github.com/apache/airflow/pull/31940#discussion_r1241918268


##########
airflow/providers/amazon/aws/sensors/s3.py:
##########
@@ -326,3 +329,36 @@ def is_keys_unchanged(self, current_objects: set[str]) -> bool:
 
     def poke(self, context: Context):
         return self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, prefix=self.prefix)))
+
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the trigger."""

Review Comment:
   I'm not sure, but it looks like we only run this method in deferred mode to me



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between pokes.",

Review Comment:
   Maybe "Some files deleted in {os.path.join(bucket_name, prefix)} between pokes."?



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:

Review Comment:
   nitpick
   
   ```suggestion
           if not previous_objects:
   ```



##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+    """
+    S3KeyTrigger is fired as deferred class with params to run the task in trigger worker.
+
+    :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
+        is not provided as a full s3:// url.
+    :param prefix: The prefix being waited on. Relative path from bucket root level.
+    :param inactivity_period: The total seconds of inactivity to designate
+        keys unchanged. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional objects sensed.
+    :param min_objects: The minimum number of objects needed for keys unchanged
+        sensor to be considered valid.
+    :param inactivity_seconds: reference to the seconds of inactivity
+    :param previous_objects: The set of object ids found during the last poke.
+    :param allow_delete: Should this sensor consider objects being deleted
+    :param aws_conn_id: reference to the s3 connection
+    :param last_activity_time: last modified or last active time
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+    :param hook_params: params for hook its optional
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        inactivity_seconds: int = 0,
+        previous_objects: set[str] | None = None,
+        allow_delete: bool = True,
+        aws_conn_id: str = "aws_default",
+        last_activity_time: datetime | None = None,
+        verify: bool | str | None = None,
+        **hook_params: Any,
+    ):
+        super().__init__()
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        if inactivity_period < 0:
+            raise ValueError("inactivity_period must be non-negative")
+        if previous_objects is None:

Review Comment:
   nitpick
   
   ```suggestion
           if not previous_objects:
   ```



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between pokes.",

Review Comment:
   nitpick
   
   ```suggestion
                   "message": f"{os.path.join(bucket_name, prefix)} between pokes.",
   ```
   
   also is there any description missing?



##########
airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -625,6 +626,118 @@ def _is_in_period(input_date: datetime) -> bool:
 
         return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
 
+    async def is_keys_unchanged_async(
+        self,
+        client: AioBaseClient,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        previous_objects: set[str] | None = None,
+        inactivity_seconds: int = 0,
+        allow_delete: bool = True,
+        last_activity_time: datetime | None = None,
+    ) -> dict[str, Any]:
+        """
+        Checks whether new objects have been uploaded and the inactivity_period
+        has passed and updates the state of the sensor accordingly.
+
+        :param client: aiobotocore client
+        :param bucket_name: the name of the bucket
+        :param prefix: a key prefix
+        :param inactivity_period:  the total seconds of inactivity to designate
+            keys unchanged. Note, this mechanism is not real time and
+            this operator may not return until a poke_interval after this period
+            has passed with no additional objects sensed.
+        :param min_objects: the minimum number of objects needed for keys unchanged
+            sensor to be considered valid.
+        :param previous_objects: the set of object ids found during the last poke.
+        :param inactivity_seconds: number of inactive seconds
+        :param last_activity_time: last activity datetime
+        :param allow_delete: Should this sensor consider objects being deleted
+            between pokes valid behavior. If true a warning message will be logged
+            when this happens. If false an error will be raised.
+        :return: dictionary with status and message
+        """
+        if previous_objects is None:
+            previous_objects = set()
+        list_keys = await self._list_keys_async(client=client, bucket_name=bucket_name, prefix=prefix)
+        current_objects = set(list_keys)
+        current_num_objects = len(current_objects)
+        if current_num_objects > len(previous_objects):
+            # When new objects arrived, reset the inactivity_seconds
+            # and update previous_objects for the next poke.
+            self.log.info(
+                "New objects found at %s, resetting last_activity_time.",
+                os.path.join(bucket_name, prefix),
+            )
+            self.log.debug("New objects: %s", current_objects - previous_objects)
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+            previous_objects = current_objects
+            return {
+                "status": "pending",
+                "previous_objects": previous_objects,
+                "last_activity_time": last_activity_time,
+                "inactivity_seconds": inactivity_seconds,
+            }
+
+        if len(previous_objects) - len(current_objects):
+            # During the last poke interval objects were deleted.
+            if allow_delete:
+                deleted_objects = previous_objects - current_objects
+                previous_objects = current_objects
+                last_activity_time = datetime.now()
+                self.log.info(
+                    "Objects were deleted during the last poke interval. Updating the "
+                    "file counter and resetting last_activity_time:\n%s",
+                    deleted_objects,
+                )
+                return {
+                    "status": "pending",
+                    "previous_objects": previous_objects,
+                    "last_activity_time": last_activity_time,
+                    "inactivity_seconds": inactivity_seconds,
+                }
+
+            return {
+                "status": "error",
+                "message": f" {os.path.join(bucket_name, prefix)} between pokes.",
+            }
+
+        if last_activity_time:
+            inactivity_seconds = int((datetime.now() - last_activity_time).total_seconds())
+        else:
+            # Handles the first poke where last inactivity time is None.
+            last_activity_time = datetime.now()
+            inactivity_seconds = 0
+
+        if inactivity_seconds >= inactivity_period:
+            path = os.path.join(bucket_name, prefix)
+
+            if current_num_objects >= min_objects:
+                success_message = (
+                    "SUCCESS: Sensor found %s objects at %s. "
+                    "Waited at least %s seconds, with no new objects uploaded."
+                )
+                self.log.info(success_message, current_num_objects, path, inactivity_period)
+                return {
+                    "status": "success",
+                    "message": success_message % (current_num_objects, path, inactivity_period),
+                }

Review Comment:
   As these variables aren't changed, it might make sense for us to use f-string and render it all at once
   
   ```suggestion
                   success_message = (
                       f"SUCCESS: Sensor found {current_num_objects} objects at {path}. "
                       "Waited at least {inactivity_period} seconds, with no new objects uploaded."
                   )
                   self.log.info(success_message)
                   return {
                       "status": "success",
                       "message": success_message,
                   }
   ```



##########
airflow/providers/amazon/aws/triggers/s3.py:
##########
@@ -97,3 +98,107 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
 
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
+
+
+class S3KeysUnchangedTrigger(BaseTrigger):
+    """
+    S3KeyTrigger is fired as deferred class with params to run the task in trigger worker.
+
+    :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
+        is not provided as a full s3:// url.
+    :param prefix: The prefix being waited on. Relative path from bucket root level.
+    :param inactivity_period: The total seconds of inactivity to designate
+        keys unchanged. Note, this mechanism is not real time and
+        this operator may not return until a poke_interval after this period
+        has passed with no additional objects sensed.
+    :param min_objects: The minimum number of objects needed for keys unchanged
+        sensor to be considered valid.
+    :param inactivity_seconds: reference to the seconds of inactivity
+    :param previous_objects: The set of object ids found during the last poke.
+    :param allow_delete: Should this sensor consider objects being deleted
+    :param aws_conn_id: reference to the s3 connection
+    :param last_activity_time: last modified or last active time
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+    :param hook_params: params for hook its optional
+    """
+
+    def __init__(
+        self,
+        bucket_name: str,
+        prefix: str,
+        inactivity_period: float = 60 * 60,
+        min_objects: int = 1,
+        inactivity_seconds: int = 0,
+        previous_objects: set[str] | None = None,
+        allow_delete: bool = True,
+        aws_conn_id: str = "aws_default",
+        last_activity_time: datetime | None = None,
+        verify: bool | str | None = None,
+        **hook_params: Any,
+    ):
+        super().__init__()
+        self.bucket_name = bucket_name
+        self.prefix = prefix
+        if inactivity_period < 0:
+            raise ValueError("inactivity_period must be non-negative")
+        if previous_objects is None:
+            previous_objects = set()
+        self.inactivity_period = inactivity_period
+        self.min_objects = min_objects
+        self.previous_objects = previous_objects
+        self.inactivity_seconds = inactivity_seconds
+        self.allow_delete = allow_delete
+        self.aws_conn_id = aws_conn_id
+        self.last_activity_time: datetime | None = last_activity_time

Review Comment:
   Why do we annotate the type again here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org