You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/11/14 08:23:24 UTC
[airflow] branch master updated: Add success/failed sets to State
class (#12359)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new f324973 Add success/failed sets to State class (#12359)
f324973 is described below
commit f32497395a321b1a9aa1f011ab1d3871fe4362d0
Author: Ace Haidrey <ah...@pandora.com>
AuthorDate: Sat Nov 14 00:22:32 2020 -0800
Add success/failed sets to State class (#12359)
Co-authored-by: Ace Haidrey <ah...@pinterest.com>
---
airflow/models/dagrun.py | 8 ++------
airflow/utils/state.py | 10 ++++++++++
tests/models/test_dagrun.py | 18 ++++++++++++++++++
3 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index c599631..27c2b84 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -428,9 +428,7 @@ class DagRun(Base, LoggingMixin):
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
# if all roots finished and at least one failed, the run failed
- if not unfinished_tasks and any(
- leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
- ):
+ if not unfinished_tasks and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis):
self.log.error('Marking run %s failed', self)
self.set_state(State.FAILED)
if execute_callbacks:
@@ -445,9 +443,7 @@ class DagRun(Base, LoggingMixin):
)
# if all leafs succeeded and no unfinished tasks, the run succeeded
- elif not unfinished_tasks and all(
- leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in leaf_tis
- ):
+ elif not unfinished_tasks and all(leaf_ti.state in State.success_states for leaf_ti in leaf_tis):
self.log.info('Marking run %s successful', self)
self.set_state(State.SUCCESS)
if execute_callbacks:
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 3ead44f..681cbc5 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -134,6 +134,16 @@ class State:
a run or has not even started.
"""
+ failed_states = frozenset([FAILED, UPSTREAM_FAILED])
+ """
+ A list of states indicating that a task or dag is a failed state.
+ """
+
+ success_states = frozenset([SUCCESS, SKIPPED])
+ """
+ A list of states indicating that a task or dag is a success state.
+ """
+
class PokeState:
"""Static class with poke states constants used in smart operator."""
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5f9b8f8..3b012bf 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -770,3 +770,21 @@ class TestDagRun(unittest.TestCase):
stats_mock.assert_called()
sched_delay_stat_call = call(f'dagrun.{dag.dag_id}.first_task_scheduling_delay', true_delay)
self.assertIn(sched_delay_stat_call, stats_mock.mock_calls)
+
+ def test_states_sets(self):
+ """
+ Tests that adding State.failed_states and State.success_states work as expected.
+ """
+ dag = DAG(dag_id='test_dagrun_states', start_date=days_ago(1))
+ dag_task_success = DummyOperator(task_id='dummy', dag=dag)
+ dag_task_failed = DummyOperator(task_id='dummy2', dag=dag)
+
+ initial_task_states = {
+ dag_task_success.task_id: State.SUCCESS,
+ dag_task_failed.task_id: State.FAILED,
+ }
+ dag_run = self.create_dag_run(dag=dag, state=State.RUNNING, task_states=initial_task_states)
+ ti_success = dag_run.get_task_instance(dag_task_success.task_id)
+ ti_failed = dag_run.get_task_instance(dag_task_failed.task_id)
+ self.assertIn(ti_success.state, State.success_states)
+ self.assertIn(ti_failed.state, State.failed_states)