You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Taragolis (via GitHub)" <gi...@apache.org> on 2023/02/25 12:18:59 UTC

[GitHub] [airflow] Taragolis commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117917750


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.
     """
 
     template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
 
-    def __init__(self, *, task_definition: str, wait_for_completion: bool = True, **kwargs):
+    def __init__(
+        self,
+        *,
+        task_definition: str,
+        wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.task_definition = task_definition
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info("Deregistering task definition %s.", self.task_definition)
         result = self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_deregister_task_definition",
-                task_definition=self.task_definition,
-                target_state=EcsTaskDefinitionStates.INACTIVE,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until the
-                # task definition is deregistered or reaches a failed state.
-                pass
-
-        return result["taskDefinition"]["taskDefinitionArn"]
+        task_definition_details = result["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+            # In some circumstances ECS Task Definition deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   To be honest this happen almost always 🤣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org