You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 23:59:00 UTC

[GitHub] [airflow] o-nikolas commented on a change in pull request #16685: reattach_prev_task ECSOperator

o-nikolas commented on a change in pull request #16685:
URL: https://github.com/apache/airflow/pull/16685#discussion_r664939787



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -264,6 +276,21 @@ def _start_task(self):
         self.log.info('ECS Task started: %s', response)
 
         self.arn = response['tasks'][0]['taskArn']
+        ecs_task_id = self.arn.split("/")[-1]
+        self.log.info(f"ECS task ID is: {ecs_task_id}")
+
+        if self.reattach:
+            # Save the task ARN in XCom to be able to reattach it if needed
+            self._xcom_set(context, key="ecs_task_arn", value=self.arn, task_id=f"{self.task_id}_task_arn")
+
+    def _xcom_set(self, context, key, value, task_id):

Review comment:
       This just contains a single line and is only used once, do we need the helper? Or did you use this mostly to make patching in your testing easier (which is completely valid IMHO)?

##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -264,6 +276,21 @@ def _start_task(self):
         self.log.info('ECS Task started: %s', response)
 
         self.arn = response['tasks'][0]['taskArn']
+        ecs_task_id = self.arn.split("/")[-1]
+        self.log.info(f"ECS task ID is: {ecs_task_id}")
+
+        if self.reattach:
+            # Save the task ARN in XCom to be able to reattach it if needed
+            self._xcom_set(context, key="ecs_task_arn", value=self.arn, task_id=f"{self.task_id}_task_arn")

Review comment:
       Minor nit: the magic string `"ecs_task_arn"` is used in a couple places. Perhaps put this in a constant. You could also use a templated string constant for the task_id 

##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -274,15 +301,14 @@ def _try_reattach_task(self):
         )
         running_tasks = list_tasks_resp['taskArns']
 
-        running_tasks_count = len(running_tasks)
-        if running_tasks_count > 1:
-            self.arn = running_tasks[0]
-            self.log.warning('More than 1 ECS Task found. Reattaching to %s', self.arn)
-        elif running_tasks_count == 1:
-            self.arn = running_tasks[0]
-            self.log.info('Reattaching task: %s', self.arn)
+        # Check if the ECS task previously launched is already running
+        previous_task_arn = self.xcom_pull(task_ids=f"{self.task_id}_task_arn", key="ecs_task_arn")
+        self.log.info(f"Previously launched task = {previous_task_arn}")

Review comment:
       Nit: It probably adds noise to log this if we're not going to use it. I think the log line inside the if statement below is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org