You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Taragolis (via GitHub)" <gi...@apache.org> on 2023/02/25 12:16:13 UTC

[GitHub] [airflow] Taragolis opened a new pull request, #29761: Use waiters in ECS Operators instead of inner sensors

Taragolis opened a new pull request, #29761:
URL: https://github.com/apache/airflow/pull/29761

   closes: #29556
   
   Use custom [botocore.Waiters](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html#waiters) instead of inner sensors in ECS Operators
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121809724


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119249364


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -92,32 +96,45 @@ def __init__(
         *,
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
-        wait_for_completion: bool = True,
+        wait_for_completion: bool = False,

Review Comment:
   Ooops. I've just incidentally change it by Ctrl+C + Ctrl + V 🤣 
   I will revert it to previous value 



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -92,32 +96,45 @@ def __init__(
         *,
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
-        wait_for_completion: bool = True,
+        wait_for_completion: bool = False,

Review Comment:
   Ooops. I've just incidentally change it by Ctrl+C, Ctrl + V 🤣 
   I will revert it to previous value 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119278430


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas merged pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas merged PR #29761:
URL: https://github.com/apache/airflow/pull/29761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119252995


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"

Review Comment:
   Hehe, I've looked into builtin `botocore` waiters for ECS: https://github.com/boto/botocore/blob/develop/botocore/data/ecs/2014-11-13/waiters-2.json
   
   And after that I checked which state available for different API calls https://docs.aws.amazon.com/AmazonECS/latest/developerguide/api_failures_messages.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117918321


##########
airflow/providers/amazon/aws/hooks/ecs.py:
##########
@@ -55,7 +55,7 @@ def should_retry_eni(exception: Exception):
     return False
 
 
-class EcsClusterStates(Enum):
+class EcsClusterStates(str, Enum):

Review Comment:
   This for allow comparison `EcsClusterStates.ACTIVE == "ACTIVE"` unfortunetly class enum.StrEnum[¶](https://docs.python.org/3/library/enum.html#enum.StrEnum) available only in python 3.11



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #29761:
URL: https://github.com/apache/airflow/pull/29761#issuecomment-1451513993

   > [ECS System test](https://github.com/apache/airflow/blob/main/tests/system/providers/amazon/aws/example_ecs.py) is failing after this merge, the [`await_cluster`](https://github.com/apache/airflow/blob/main/tests/system/providers/amazon/aws/example_ecs.py#L100) task (which calls [`EcsClusterStateSensor`](https://github.com/apache/airflow/blob/181a8252597e314e5675e2b9655cb44da412eeb2/airflow/providers/amazon/aws/sensors/ecs.py#L65)) is timing out after logging `INFO airflow.task.operators:ecs.py:98 Cluster state: EcsClusterStates.ACTIVE, waiting for: EcsClusterStates.ACTIVE`. I'll try to take a look at it tomorrow
   
   And this a bit strange, because after this logger record, as I could see there is no external calls (to AWS) and only simple validation which should finished in a milliseconds


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117918321


##########
airflow/providers/amazon/aws/hooks/ecs.py:
##########
@@ -55,7 +55,7 @@ def should_retry_eni(exception: Exception):
     return False
 
 
-class EcsClusterStates(Enum):
+class EcsClusterStates(str, Enum):

Review Comment:
   This for allow comparison `EcsClusterStates.ACTIVE == "ACTIVE"` unfortunetly class [StrEnum](https://docs.python.org/3/library/enum.html#enum.StrEnum) available only in python 3.11



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121005945


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -83,6 +83,10 @@ class EcsCreateClusterOperator(EcsBaseOperator):
         cluster, you create a cluster that's named default.
     :param create_cluster_kwargs: Extra arguments for Cluster Creation.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   I'm a bit confused by this, if it can be deleted immediately, then why are we checking for active state? Shouldn't we be checking for deleted state?



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -130,6 +147,10 @@ class EcsDeleteClusterOperator(EcsBaseOperator):
 
     :param cluster_name: The short name or full Amazon Resource Name (ARN) of the cluster to delete.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -213,6 +268,10 @@ class EcsRegisterTaskDefinitionOperator(EcsBaseOperator):
         the different containers that make up your task.
     :param register_task_kwargs: Extra arguments for Register Task Definition.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.
     """
 
     template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
 
-    def __init__(self, *, task_definition: str, wait_for_completion: bool = True, **kwargs):
+    def __init__(
+        self,
+        *,
+        task_definition: str,
+        wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.task_definition = task_definition
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info("Deregistering task definition %s.", self.task_definition)
         result = self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_deregister_task_definition",
-                task_definition=self.task_definition,
-                target_state=EcsTaskDefinitionStates.INACTIVE,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until the
-                # task definition is deregistered or reaches a failed state.
-                pass
-
-        return result["taskDefinition"]["taskDefinitionArn"]
+        task_definition_details = result["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+            # In some circumstances ECS Task Definition deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Task Definition is deleted immediately,
               # so there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:
+            # In some circumstances ECS Cluster deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Cluster is deleted immediately,
               # and there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -249,18 +312,28 @@ def execute(self, context: Context):
             containerDefinitions=self.container_definitions,
             **self.register_task_kwargs,
         )
-        task_arn = response["taskDefinition"]["taskDefinitionArn"]
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_register_task_definition", task_definition=task_arn
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the task definition is registered or reaches a failed state.
-                pass
-
-        context["ti"].xcom_push(key="task_definition_arn", value=task_arn)
-        return task_arn
+        task_definition_details = response["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.ACTIVE:
+            # In some circumstances ECS Task Definition created immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Task Definition is created immediately,
               # so there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -139,28 +160,39 @@ def __init__(
         *,
         cluster_name: str,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
-        self.log.info("Deleting cluster %s.", self.cluster_name)
+        self.log.info("Deleting cluster %r.", self.cluster_name)
         result = self.client.delete_cluster(cluster=self.cluster_name)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster_delete",
-                cluster_name=self.cluster_name,
-                target_state=EcsClusterStates.INACTIVE,
-                failure_states={EcsClusterStates.FAILED},
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is deleted or reaches a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.INACTIVE:
+            # In some circumstances ECS Cluster deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Cluster is deleted immediately,
               # so there is no reason to wait for completion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119259747


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {
+            "operation": "DescribeTaskDefinition",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "path",
+                    "state": "success",
+                    "argument": "taskDefinition.status"
+                }
+            ]
+        },
+        "task_definition_inactive": {

Review Comment:
   I'd rather say it intermediate state in case of delete task definition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119258618


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {

Review Comment:
   Yeah 'DELETE_IN_PROGRESS' it could be some kind failure step, I can't imagine how it possible that state changed to this state during create task definition, but why not to add it as failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on PR #29761:
URL: https://github.com/apache/airflow/pull/29761#issuecomment-1451039707

   [ECS System test](https://github.com/apache/airflow/blob/main/tests/system/providers/amazon/aws/example_ecs.py) is failing after this merge, the [`await_cluster`](https://github.com/apache/airflow/blob/main/tests/system/providers/amazon/aws/example_ecs.py#L100) task  (which calls [`EcsClusterStateSensor`](https://github.com/apache/airflow/blob/181a8252597e314e5675e2b9655cb44da412eeb2/airflow/providers/amazon/aws/sensors/ecs.py#L65)) is timing out after logging `INFO     airflow.task.operators:ecs.py:98 Cluster state: EcsClusterStates.ACTIVE, waiting for: EcsClusterStates.ACTIVE`.  I'll try to take a look at it tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119260415


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {

Review Comment:
   Pretty much my thought as well.   I don't how it would get there, but if it did then something is clearly wrong so there's no point waiting for the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119260415


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {

Review Comment:
   Pretty much my thought as well.   I don't how it would get there, but if it did then sdome5thing is clearly wrong so there's no point waiting for the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121505755


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   Aaaaand this another Ctrl+C, Ctrl+V issue. The comment is a "lier", I will change it because it should stand for "is **created** immediately". With default provisioners ECS cluster is created during API call [CreateCluster](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_CreateCluster.html) just a personal findings based on years of usage ECS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #29761:
URL: https://github.com/apache/airflow/pull/29761#issuecomment-1451639773

   Yep locally it also checked forever `¯\_(ツ)_/¯`
   
   I guess it is combination of airflow serializer, templated fields and multi inheritance (which I add for enums)
   
   ![image](https://user-images.githubusercontent.com/3998685/222401603-280a2298-b0c5-4266-b624-df1ae5c702d6.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1119241035


##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {
+            "operation": "DescribeTaskDefinition",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "path",
+                    "state": "success",
+                    "argument": "taskDefinition.status"
+                }
+            ]
+        },
+        "task_definition_inactive": {

Review Comment:
   Would `'DELETE_IN_PROGRESS'` be a success state here?



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -92,32 +96,45 @@ def __init__(
         *,
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
-        wait_for_completion: bool = True,
+        wait_for_completion: bool = False,

Review Comment:
   Does this require a deprecation warning since it's changing default behavior?



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -39,7 +39,7 @@
     EcsTaskDefinitionStates,
     should_retry_eni,
 )
-from airflow.providers.amazon.aws.sensors.ecs import EcsClusterStateSensor, EcsTaskDefinitionStateSensor
+from airflow.utils.helpers import prune_dict

Review Comment:
   Nice.  I have a bunch of EMR waiters I have been workigb on converting over but put on pause to verify how they behave if `None`  is passed in, but this would solve that nncely.  I'll get the EMR ones up this week and tag you in them.  :+1: 



##########
airflow/providers/amazon/aws/hooks/ecs.py:
##########
@@ -55,7 +55,7 @@ def should_retry_eni(exception: Exception):
     return False
 
 
-class EcsClusterStates(Enum):
+class EcsClusterStates(str, Enum):

Review Comment:
   Yeah, I was hoping to use StrEnum there but... not yet. :(  I didn't know you could do it this way, nice.



##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{

Review Comment:
   Love seeing this put to use.   I think it's so much cleaner in the implementation.  :+1: 



##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"

Review Comment:
   Interesting catch.  :+1: 



##########
airflow/providers/amazon/aws/waiters/ecs.json:
##########
@@ -0,0 +1,81 @@
+{
+    "version": 2,
+    "waiters": {
+        "cluster_active": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "ACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "FAILED",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "failure",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "cluster_inactive": {
+            "operation": "DescribeClusters",
+            "delay": 15,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "expected": "INACTIVE",
+                    "matcher": "pathAny",
+                    "state": "success",
+                    "argument": "clusters[].status"
+                },
+                {
+                  "expected": "MISSING",
+                  "matcher": "pathAny",
+                  "state": "success",
+                  "argument": "failures[].reason"
+                }
+            ]
+        },
+        "task_definition_active": {

Review Comment:
   No failure states for this one?  I think if the status goes to `'DELETE_IN_PROGRESS'` it can fail early.  Not sure where `'INACTIVE'` is on the state flow on this call though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117917750


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.
     """
 
     template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
 
-    def __init__(self, *, task_definition: str, wait_for_completion: bool = True, **kwargs):
+    def __init__(
+        self,
+        *,
+        task_definition: str,
+        wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.task_definition = task_definition
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info("Deregistering task definition %s.", self.task_definition)
         result = self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_deregister_task_definition",
-                task_definition=self.task_definition,
-                target_state=EcsTaskDefinitionStates.INACTIVE,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until the
-                # task definition is deregistered or reaches a failed state.
-                pass
-
-        return result["taskDefinition"]["taskDefinitionArn"]
+        task_definition_details = result["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+            # In some circumstances ECS Task Definition deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   To be honest this happen almost always 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org