You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by tu...@apache.org on 2020/10/08 10:51:59 UTC
[airflow] branch v1-10-test updated: Handle no Dagrun in
DagrunIdDep (#8389) (#11343)
This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new a425105 Handle no Dagrun in DagrunIdDep (#8389) (#11343)
a425105 is described below
commit a42510575f2f727b1b5fe961055e1d115a2a85c1
Author: Tomek Urbaszek <tu...@gmail.com>
AuthorDate: Thu Oct 8 12:50:30 2020 +0200
Handle no Dagrun in DagrunIdDep (#8389) (#11343)
Co-authored-by: Ben Nadler <jb...@gmail.com>
---
airflow/ti_deps/deps/dagrun_id_dep.py | 4 ++--
tests/ti_deps/deps/test_dagrun_id_dep.py | 7 +++++++
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git a/airflow/ti_deps/deps/dagrun_id_dep.py b/airflow/ti_deps/deps/dagrun_id_dep.py
index 641fe84..56bba33 100644
--- a/airflow/ti_deps/deps/dagrun_id_dep.py
+++ b/airflow/ti_deps/deps/dagrun_id_dep.py
@@ -48,9 +48,9 @@ class DagrunIdDep(BaseTIDep):
from airflow.jobs import BackfillJob # To avoid a circular dependency
dagrun = ti.get_dagrun(session)
- if not dagrun.run_id or not match(BackfillJob.ID_PREFIX + '.*', dagrun.run_id):
+ if not dagrun or not dagrun.run_id or not match(BackfillJob.ID_PREFIX + '.*', dagrun.run_id):
yield self._passing_status(
- reason="Task's DagRun run_id is either NULL "
+ reason="Task's DagRun doesn't exist or the run_id is either NULL "
"or doesn't start with {}".format(BackfillJob.ID_PREFIX))
else:
yield self._failing_status(
diff --git a/tests/ti_deps/deps/test_dagrun_id_dep.py b/tests/ti_deps/deps/test_dagrun_id_dep.py
index 56f15fb..23db761 100644
--- a/tests/ti_deps/deps/test_dagrun_id_dep.py
+++ b/tests/ti_deps/deps/test_dagrun_id_dep.py
@@ -49,3 +49,10 @@ class DagrunRunningDepTest(unittest.TestCase):
dagrun.run_id = None
ti = Mock(get_dagrun=Mock(return_value=dagrun))
self.assertTrue(DagrunIdDep().is_met(ti=ti))
+
+ def test_dagrun_is_none(self):
+ """
+ Task instances which don't yet have an associated dagrun.
+ """
+ ti = Mock(get_dagrun=Mock(return_value=None))
+ self.assertTrue(DagrunIdDep().is_met(ti=ti))