You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/12 14:55:20 UTC

incubator-airflow git commit: [AIRFLOW-1996] Update DataflowHook waitfordone for Streaming type job[]

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 147472b99 -> a2bb2d70a


[AIRFLOW-1996] Update DataflowHook waitfordone for Streaming type job[]

AIRFLOW-1996 Update DataflowHook waitfordone for
Streaming type job

fix flake8

Closes #2938 from ivanwirawan/AIRFLOW-1996


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a2bb2d70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a2bb2d70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a2bb2d70

Branch: refs/heads/master
Commit: a2bb2d70afdaf6a016c60d62a1ddea6d4a442c61
Parents: 147472b
Author: Ivan Wirawan <iv...@gmail.com>
Authored: Fri Jan 12 15:55:14 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 12 15:55:14 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a2bb2d70/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f9970d9..d60b498 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -65,6 +65,9 @@ class _DataflowJob(LoggingMixin):
             if 'currentState' in self._job:
                 if 'JOB_STATE_DONE' == self._job['currentState']:
                     return True
+                elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
+                     'JOB_TYPE_STREAMING' == self._job['type']:
+                    return True
                 elif 'JOB_STATE_FAILED' == self._job['currentState']:
                     raise Exception("Google Cloud Dataflow job {} has failed.".format(
                         self._job['name']))