You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/12/11 12:52:26 UTC

[GitHub] [airflow] ashb commented on a change in pull request #6765: [AIRFLOW-5889] Fix polling for AWS Batch job status

ashb commented on a change in pull request #6765: [AIRFLOW-5889] Fix polling for AWS Batch job status
URL: https://github.com/apache/airflow/pull/6765#discussion_r356579840
 
 

 ##########
 File path: airflow/contrib/operators/awsbatch_operator.py
 ##########
 @@ -156,32 +179,68 @@ def _wait_for_task_ended(self):
             waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
             waiter.wait(jobs=[self.jobId])
         except ValueError:
-            # If waiter not available use expo
+            self._poll_for_task_ended()
 
-            # Allow a batch job some time to spin up.  A random interval
-            # decreases the chances of exceeding an AWS API throttle
-            # limit when there are many concurrent tasks.
-            pause = randint(5, 30)
+    def _poll_for_task_ended(self):
+        """
+        Poll for task status using a exponential backoff
 
-            retries = 1
-            while retries <= self.max_retries:
-                self.log.info('AWS Batch job (%s) status check (%d of %d) in the next %.2f seconds',
-                              self.jobId, retries, self.max_retries, pause)
-                sleep(pause)
+            * docs.aws.amazon.com/general/latest/gr/api-retries.html
+        """
+        # Allow a batch job some time to spin up.  A random interval
+        # decreases the chances of exceeding an AWS API throttle
+        # limit when there are many concurrent tasks.
+        pause = randint(5, 30)
+
+        retries = 1
+        while retries <= self.max_retries:
+            self.log.info(
+                'AWS Batch job (%s) status check (%d of %d) in the next %.2f seconds',
+                self.jobId,
+                retries,
+                self.max_retries,
+                pause,
+            )
+            sleep(pause)
+
+            response = self._get_job_status()
+            status = response['jobs'][-1]['status']  # check last job status
+            self.log.info('AWS Batch job (%s) status: %s', self.jobId, status)
+
+            # jobStatus: 'SUBMITTED'|'PENDING'|'RUNNABLE'|'STARTING'|'RUNNING'|'SUCCEEDED'|'FAILED'
+            if status in ['SUCCEEDED', 'FAILED']:
+                break
+
+            retries += 1
+            pause = 1 + pow(retries * 0.3, 2)
+
+    def _get_job_status(self) -> Optional[dict]:
+        """
+        Get job description
 
+            * https://docs.aws.amazon.com/batch/latest/APIReference/API_DescribeJobs.html
+        """
+        tries = 0
+        while tries <= 10:
+            tries += 1
+            try:
                 response = self.client.describe_jobs(jobs=[self.jobId])
-                status = response['jobs'][-1]['status']
-                self.log.info('AWS Batch job (%s) status: %s', self.jobId, status)
-                if status in ['SUCCEEDED', 'FAILED']:
-                    break
-
-                retries += 1
-                pause = 1 + pow(retries * 0.3, 2)
+                if response and response.get('jobs'):
+                    return response
+            except botocore.exceptions.ClientError as err:
+                response = err.response
+                self.log.info('Failed to get job status: ', response)
+                if response:
+                    if response.get('Error', {}).get('Code') == 'TooManyRequestsException':
+                        self.log.info('Continue for TooManyRequestsException')
+                        sleep(randint(1, 10))  # avoid excess requests with a random pause
+                        continue
+
+        self.log.error('Failed to get job status: ', self.jobId)
 
 Review comment:
   I don't think we need to log and throw -- the exception will produce the same log message ultimately so this would just duplicate it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services