You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 14:55:20 UTC
incubator-airflow git commit: [AIRFLOW-1996] Update DataflowHook
waitfordone for Streaming type job[]
Repository: incubator-airflow
Updated Branches:
refs/heads/master 147472b99 -> a2bb2d70a
[AIRFLOW-1996] Update DataflowHook waitfordone for Streaming type job[]
AIRFLOW-1996 Update DataflowHook waitfordone for
Streaming type job
fix flake8
Closes #2938 from ivanwirawan/AIRFLOW-1996
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2bb2d70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2bb2d70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2bb2d70
Branch: refs/heads/master
Commit: a2bb2d70afdaf6a016c60d62a1ddea6d4a442c61
Parents: 147472b
Author: Ivan Wirawan <iv...@gmail.com>
Authored: Fri Jan 12 15:55:14 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 15:55:14 2018 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/gcp_dataflow_hook.py | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2bb2d70/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f9970d9..d60b498 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -65,6 +65,9 @@ class _DataflowJob(LoggingMixin):
if 'currentState' in self._job:
if 'JOB_STATE_DONE' == self._job['currentState']:
return True
+ elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
+ 'JOB_TYPE_STREAMING' == self._job['type']:
+ return True
elif 'JOB_STATE_FAILED' == self._job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))