You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/21 11:58:31 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395986215
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -331,16 +372,52 @@ def _update_datasync_task(self):
         self.log.info("Updated TaskArn %s", self.task_arn)
         return self.task_arn
 
-    def _execute_datasync_task(self):
-        """Create and monitor an AWSDataSync TaskExecution for a Task."""
-        hook = self.get_hook()
+    def _wait_get_status_before_start(
+            self,
+            max_iterations=12 * 180):  # wait_interval_seconds*12*180=180 minutes by default
+        """
+        Wait until the Task can be started.
 
+        The Task can be started when its Status is not in TASK_STATUS_WAIT_BEFORE_START
+        Uses wait_interval_seconds (which is also used while waiting for TaskExecution)
+        So, max_iterations=12*180 gives 180 minutes wait by default.
+        """
+        hook = self.get_hook()
+        task_status = hook.get_task_description(self.task_arn)['Status']
+        iteration = 0
+        while task_status in self.TASK_STATUS_WAIT_BEFORE_START:
+            self.log.info(
+                'Task status is %s.'
+                ' Waiting for it to not be %s.'
+                ' Iteration %s/%s.',
+                task_status,
+                self.TASK_STATUS_WAIT_BEFORE_START,
+                iteration,
+                max_iterations)
+            time.sleep(self.wait_interval_seconds)
+            task_status = hook.get_task_description(self.task_arn)['Status']
+            iteration = iteration + 1
+            if iteration >= max_iterations:
+                break
+
+        return task_status
 
 Review comment:
   ```suggestion
           hook = self.get_hook()
           for iteration in range(max_iterations):
               task_status = hook.get_task_description(self.task_arn)['Status']
               self.log.info(
                   'Task status is %s.'
                   ' Waiting for it to not be %s.'
                   ' Iteration %s/%s.',
                   task_status,
                   self.TASK_STATUS_WAIT_BEFORE_START,
                   iteration,
                   max_iterations)
               if task_status not in self.TASK_STATUS_WAIT_BEFORE_START:
                   break
               time.sleep(self.wait_interval_seconds)
           return task_status
   ```
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services