You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/12/14 21:03:22 UTC

incubator-airflow git commit: [AIRFLOW-667] Handle BigQuery 503 error

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 67ab416db -> cac133001


[AIRFLOW-667] Handle BigQuery 503 error

Closes #1938 from krmettu/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cac13300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cac13300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cac13300

Branch: refs/heads/master
Commit: cac133001b517dd7d66a31288dd375eb63d8cd26
Parents: 67ab416
Author: Krishnaveni Mettu <kr...@move.com>
Authored: Wed Dec 14 13:02:48 2016 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Wed Dec 14 13:02:56 2016 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py | 37 +++++++++++++++++++----------
 1 file changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cac13300/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index a0cb71d..900ec12 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -470,21 +470,32 @@ class BigQueryBaseCursor(object):
             .insert(projectId=self.project_id, body=job_data) \
             .execute()
         job_id = query_reply['jobReference']['jobId']
-        job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
 
         # Wait for query to finish.
-        while not job['status']['state'] == 'DONE':
-            logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id)
-            time.sleep(5)
-            job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
-
-        # Check if job had errors.
-        if 'errorResult' in job['status']:
-            raise Exception(
-                'BigQuery job failed. Final error was: {}. The job was: {}'.format(
-                    job['status']['errorResult'], job
-                )
-            )
+        keep_polling_job = True
+        while (keep_polling_job):
+            try:
+                job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
+                if (job['status']['state'] == 'DONE'):
+                    keep_polling_job = False
+                    # Check if job had errors.
+                    if 'errorResult' in job['status']:
+                        raise Exception(
+                            'BigQuery job failed. Final error was: {}. The job was: {}'.format(
+                                job['status']['errorResult'], job
+                            )
+                        )
+                else:
+                    logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+                    time.sleep(5)
+
+            except HttpError, err:
+                if err.code in [500, 503]:
+                    logging.info('%s: Retryable error, waiting for job to complete: %s', err.code, job_id)
+                    time.sleep(5)
+                else:
+                    raise Exception(
+                'BigQuery job status check faild. Final error was: %s', err.code)
 
         return job_id