You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:13 UTC
[09/28] incubator-airflow git commit: [AIRFLOW-897] Prevent dagruns
from failing with unfinished tasks
[AIRFLOW-897] Prevent dagruns from failing with unfinished tasks
Closes #2099 from
aoen/ddavydov/fix_premature_dagrun_failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c29af466
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c29af466
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c29af466
Branch: refs/heads/v1-8-test
Commit: c29af4668a67b5d7f969140549558714fb7b32c9
Parents: ff0fa00
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Feb 24 14:29:11 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:17:40 2017 -0700
----------------------------------------------------------------------
airflow/models.py | 6 +++---
tests/dags/test_issue_1225.py | 13 +++++++++++++
tests/jobs.py | 24 ++++++++++++++++++++++++
3 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1829ff3..3fef407 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3993,12 +3993,12 @@ class DagRun(Base):
# future: remove the check on adhoc tasks (=active_tasks)
if len(tis) == len(dag.active_tasks):
- # if any roots failed, the run failed
root_ids = [t.task_id for t in dag.roots]
roots = [t for t in tis if t.task_id in root_ids]
- if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
- for r in roots):
+ # if all roots finished and at least on failed, the run failed
+ if (not unfinished_tasks and
+ any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
logging.info('Marking run {} failed'.format(self))
self.state = State.FAILED
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 021561f..d01fd79 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -129,3 +129,16 @@ dag7_subdag1 = SubDagOperator(
subdag=subdag7)
subdag7_task1.set_downstream(subdag7_task2)
subdag7_task2.set_downstream(subdag7_task3)
+
+# DAG tests that a Dag run that doesn't complete but has a root failure is marked running
+dag8 = DAG(dag_id='test_dagrun_states_root_fail_unfinished', default_args=default_args)
+dag8_task1 = DummyOperator(
+ task_id='test_dagrun_unfinished', # The test will unset the task instance state after
+ # running this test
+ dag=dag8,
+)
+dag8_task2 = PythonOperator(
+ task_id='test_dagrun_fail',
+ dag=dag8,
+ python_callable=fail,
+)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c29af466/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e520b44..1f7950e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -358,6 +358,30 @@ class SchedulerJobTest(unittest.TestCase):
},
dagrun_state=State.FAILED)
+ def test_dagrun_root_fail_unfinished(self):
+ """
+ DagRuns with one unfinished and one failed root task -> RUNNING
+ """
+ # Run both the failed and successful tasks
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ dag_id = 'test_dagrun_states_root_fail_unfinished'
+ dag = self.dagbag.get_dag(dag_id)
+ dag.clear()
+ dr = scheduler.create_dag_run(dag)
+ try:
+ dag.run(start_date=dr.execution_date, end_date=dr.execution_date)
+ except AirflowException: # Expect an exception since there is a failed task
+ pass
+
+ # Mark the successful task as never having run since we want to see if the
+ # dagrun will be in a running state despite haveing an unfinished task.
+ session = settings.Session()
+ ti = dr.get_task_instance('test_dagrun_unfinished', session=session)
+ ti.state = State.NONE
+ session.commit()
+ dr_state = dr.update_state()
+ self.assertEqual(dr_state, State.RUNNING)
+
def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
"""
DagRun is marked a success if ignore_first_depends_on_past=True