You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/09/13 16:32:22 UTC

incubator-airflow git commit: [AIRFLOW-1608] Handle pending job state in GCP Dataflow hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6ac2963fb -> 653562e68


[AIRFLOW-1608] Handle pending job state in GCP Dataflow hook

Closes #2607 from TJBIII/gcp_dataflow_hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/653562e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/653562e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/653562e6

Branch: refs/heads/master
Commit: 653562e68a638e1e7c9060df347375748388daa9
Parents: 6ac2963
Author: Thomas Buida <th...@gamewisp.com>
Authored: Wed Sep 13 09:32:06 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Sep 13 09:32:06 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/653562e6/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index 66dfb07..457fa37 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -71,6 +71,8 @@ class _DataflowJob(LoggingMixin):
                         self._job['name']))
                 elif 'JOB_STATE_RUNNING' == self._job['currentState']:
                     time.sleep(10)
+                elif 'JOB_STATE_PENDING' == self._job['currentState']:
+                    time.sleep(15)
                 else:
                     self.logger.debug(str(self._job))
                     raise Exception(