You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/11/14 08:23:24 UTC

[airflow] branch master updated: Add success/failed sets to State class (#12359)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new f324973  Add success/failed sets to State class (#12359)
f324973 is described below

commit f32497395a321b1a9aa1f011ab1d3871fe4362d0
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Sat Nov 14 00:22:32 2020 -0800

    Add success/failed sets to State class (#12359)
    
    Co-authored-by: Ace Haidrey <ah...@pinterest.com>
---
 airflow/models/dagrun.py    |  8 ++------
 airflow/utils/state.py      | 10 ++++++++++
 tests/models/test_dagrun.py | 18 ++++++++++++++++++
 3 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c599631..27c2b84 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -428,9 +428,7 @@ class DagRun(Base, LoggingMixin):
         leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
 
         # if all roots finished and at least one failed, the run failed
-        if not unfinished_tasks and any(
-            leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
-        ):
+        if not unfinished_tasks and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis):
             self.log.error('Marking run %s failed', self)
             self.set_state(State.FAILED)
             if execute_callbacks:
@@ -445,9 +443,7 @@ class DagRun(Base, LoggingMixin):
                 )
 
         # if all leafs succeeded and no unfinished tasks, the run succeeded
-        elif not unfinished_tasks and all(
-            leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in leaf_tis
-        ):
+        elif not unfinished_tasks and all(leaf_ti.state in State.success_states for leaf_ti in leaf_tis):
             self.log.info('Marking run %s successful', self)
             self.set_state(State.SUCCESS)
             if execute_callbacks:
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 3ead44f..681cbc5 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -134,6 +134,16 @@ class State:
     a run or has not even started.
     """
 
+    failed_states = frozenset([FAILED, UPSTREAM_FAILED])
+    """
+    A list of states indicating that a task or dag is a failed state.
+    """
+
+    success_states = frozenset([SUCCESS, SKIPPED])
+    """
+    A list of states indicating that a task or dag is a success state.
+    """
+
 
 class PokeState:
     """Static class with poke states constants used in smart operator."""
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5f9b8f8..3b012bf 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -770,3 +770,21 @@ class TestDagRun(unittest.TestCase):
         stats_mock.assert_called()
         sched_delay_stat_call = call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
         self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
+
+    def test_states_sets(self):
+        """
+        Tests that adding State.failed_states and State.success_states work as expected.
+        """
+        dag = DAG(dag_id='test_dagrun_states', start_date=days_ago(1))
+        dag_task_success = DummyOperator(task_id='dummy', dag=dag)
+        dag_task_failed = DummyOperator(task_id='dummy2', dag=dag)
+
+        initial_task_states = {
+            dag_task_success.task_id: State.SUCCESS,
+            dag_task_failed.task_id: State.FAILED,
+        }
+        dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states)
+        ti_success = dag_run.get_task_instance(dag_task_success.task_id)
+        ti_failed = dag_run.get_task_instance(dag_task_failed.task_id)
+        self.assertIn(ti_success.state, State.success_states)
+        self.assertIn(ti_failed.state, State.failed_states)