You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "o-nikolas (via GitHub)" <gi...@apache.org> on 2023/03/01 01:34:04 UTC

[GitHub] [airflow] o-nikolas commented on a diff in pull request #29761: Use waiters in ECS Operators instead of inner sensors

o-nikolas commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1121005945


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -83,6 +83,10 @@ class EcsCreateClusterOperator(EcsBaseOperator):
         cluster, you create a cluster that's named default.
     :param create_cluster_kwargs: Extra arguments for Cluster Creation.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:

Review Comment:
   I'm a bit confused by this, if it can be deleted immediately, then why are we checking for active state? Shouldn't we be checking for deleted state?



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -130,6 +147,10 @@ class EcsDeleteClusterOperator(EcsBaseOperator):
 
     :param cluster_name: The short name or full Amazon Resource Name (ARN) of the cluster to delete.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -213,6 +268,10 @@ class EcsRegisterTaskDefinitionOperator(EcsBaseOperator):
         the different containers that make up your task.
     :param register_task_kwargs: Extra arguments for Register Task Definition.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.

Review Comment:
   ```suggestion
       :param waiter_delay: The amount of time in seconds to wait between attempts,
           if not set then the default waiter value will be used.
       :param waiter_max_attempts: The maximum number of attempts to be made,
           if not set then the default waiter value will be used.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
     :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
         of the task definition to deregister. If you use a family name, you must specify a revision.
     :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    :param waiter_delay: The amount of time in seconds to wait between attempts,
+        if not set then default waiter value will use.
+    :param waiter_max_attempts: The maximum number of attempts to be made,
+        if not set then default waiter value will use.
     """
 
     template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
 
-    def __init__(self, *, task_definition: str, wait_for_completion: bool = True, **kwargs):
+    def __init__(
+        self,
+        *,
+        task_definition: str,
+        wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.task_definition = task_definition
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info("Deregistering task definition %s.", self.task_definition)
         result = self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_deregister_task_definition",
-                task_definition=self.task_definition,
-                target_state=EcsTaskDefinitionStates.INACTIVE,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until the
-                # task definition is deregistered or reaches a failed state.
-                pass
-
-        return result["taskDefinition"]["taskDefinitionArn"]
+        task_definition_details = result["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+            # In some circumstances ECS Task Definition deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Task Definition is deleted immediately,
               # so there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -93,31 +97,44 @@ def __init__(
         cluster_name: str,
         create_cluster_kwargs: dict | None = None,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.create_cluster_kwargs = create_cluster_kwargs or {}
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
         self.log.info(
-            "Creating cluster %s using the following values: %s",
+            "Creating cluster %r using the following values: %s",
             self.cluster_name,
             self.create_cluster_kwargs,
         )
         result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster",
-                cluster_name=self.cluster_name,
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is ready or has reached a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.ACTIVE:
+            # In some circumstances ECS Cluster deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Cluster is deleted immediately,
               # and there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -249,18 +312,28 @@ def execute(self, context: Context):
             containerDefinitions=self.container_definitions,
             **self.register_task_kwargs,
         )
-        task_arn = response["taskDefinition"]["taskDefinitionArn"]
-
-        if self.wait_for_completion:
-            while not EcsTaskDefinitionStateSensor(
-                task_id="await_register_task_definition", task_definition=task_arn
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the task definition is registered or reaches a failed state.
-                pass
-
-        context["ti"].xcom_push(key="task_definition_arn", value=task_arn)
-        return task_arn
+        task_definition_details = response["taskDefinition"]
+        task_definition_arn = task_definition_details["taskDefinitionArn"]
+        task_definition_state = task_definition_details.get("status")
+
+        if task_definition_state == EcsTaskDefinitionStates.ACTIVE:
+            # In some circumstances ECS Task Definition created immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Task Definition is created immediately,
               # so there is no reason to wait for completion.
   ```



##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -139,28 +160,39 @@ def __init__(
         *,
         cluster_name: str,
         wait_for_completion: bool = True,
+        waiter_delay: int | None = None,
+        waiter_max_attempts: int | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.cluster_name = cluster_name
         self.wait_for_completion = wait_for_completion
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
 
     def execute(self, context: Context):
-        self.log.info("Deleting cluster %s.", self.cluster_name)
+        self.log.info("Deleting cluster %r.", self.cluster_name)
         result = self.client.delete_cluster(cluster=self.cluster_name)
-
-        if self.wait_for_completion:
-            while not EcsClusterStateSensor(
-                task_id="await_cluster_delete",
-                cluster_name=self.cluster_name,
-                target_state=EcsClusterStates.INACTIVE,
-                failure_states={EcsClusterStates.FAILED},
-            ).poke(context):
-                # The sensor has a built-in delay and will try again until
-                # the cluster is deleted or reaches a failed state.
-                pass
-
-        return result["cluster"]
+        cluster_details = result["cluster"]
+        cluster_state = cluster_details.get("status")
+
+        if cluster_state == EcsClusterStates.INACTIVE:
+            # In some circumstances ECS Cluster deleted immediately,
+            # and there is no reason wait for completion.

Review Comment:
   ```suggestion
               # In some circumstances the ECS Cluster is deleted immediately,
               # so there is no reason to wait for completion.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org