You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/12/14 21:03:22 UTC
incubator-airflow git commit: [AIRFLOW-667] Handle BigQuery 503 error
Repository: incubator-airflow
Updated Branches:
refs/heads/master 67ab416db -> cac133001
[AIRFLOW-667] Handle BigQuery 503 error
Closes #1938 from krmettu/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cac13300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cac13300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cac13300
Branch: refs/heads/master
Commit: cac133001b517dd7d66a31288dd375eb63d8cd26
Parents: 67ab416
Author: Krishnaveni Mettu <kr...@move.com>
Authored: Wed Dec 14 13:02:48 2016 -0800
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Wed Dec 14 13:02:56 2016 -0800
----------------------------------------------------------------------
airflow/contrib/hooks/bigquery_hook.py | 37 +++++++++++++++++++----------
1 file changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cac13300/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index a0cb71d..900ec12 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -470,21 +470,32 @@ class BigQueryBaseCursor(object):
.insert(projectId=self.project_id, body=job_data) \
.execute()
job_id = query_reply['jobReference']['jobId']
- job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
# Wait for query to finish.
- while not job['status']['state'] == 'DONE':
- logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id)
- time.sleep(5)
- job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
-
- # Check if job had errors.
- if 'errorResult' in job['status']:
- raise Exception(
- 'BigQuery job failed. Final error was: {}. The job was: {}'.format(
- job['status']['errorResult'], job
- )
- )
+ keep_polling_job = True
+ while (keep_polling_job):
+ try:
+ job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
+ if (job['status']['state'] == 'DONE'):
+ keep_polling_job = False
+ # Check if job had errors.
+ if 'errorResult' in job['status']:
+ raise Exception(
+ 'BigQuery job failed. Final error was: {}. The job was: {}'.format(
+ job['status']['errorResult'], job
+ )
+ )
+ else:
+ logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+ time.sleep(5)
+
+ except HttpError, err:
+ if err.code in [500, 503]:
+ logging.info('%s: Retryable error, waiting for job to complete: %s', err.code, job_id)
+ time.sleep(5)
+ else:
+ raise Exception(
+ 'BigQuery job status check faild. Final error was: %s', err.code)
return job_id