You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2018/03/22 06:54:23 UTC

incubator-airflow git commit: [AIRFLOW-1460] Allow restoration of REMOVED TI's

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8754cb1c6 -> 8c42d03c4


[AIRFLOW-1460] Allow restoration of REMOVED TI's

When a task instance exists in the database but
its corresponding task
no longer exists in the DAG, the scheduler marks
the task instance as
REMOVED. Once removed, task instances stayed
removed forever, even if
the task were to be added back to the DAG.

This change allows for the restoration of REMOVED
task instances. If a
task instance is in state REMOVED but the
corresponding task is present
in the DAG, restore the task instance by setting
its state to NONE.

A new unit test simulates the removal and
restoration of a task from a
DAG and verifies that the task instance is
restored:
`./run_unit_tests.sh tests.models:DagRunTest`

JIRA:
https://issues.apache.org/jira/browse/AIRFLOW-1460

Closes #3137 from astahlman/airflow-1460-restore-
tis


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8c42d03c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8c42d03c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8c42d03c

Branch: refs/heads/master
Commit: 8c42d03c4e35a0046e46f0e2e6db588702ee7e8b
Parents: 8754cb1
Author: Andrew Stahlman <an...@gmail.com>
Authored: Wed Mar 21 23:54:05 2018 -0700
Committer: Maxime Beauchemin <ma...@gmail.com>
Committed: Wed Mar 21 23:54:05 2018 -0700

----------------------------------------------------------------------
 airflow/models.py | 21 ++++++++++++++++++---
 tests/models.py   | 24 ++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8c42d03c/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c1b608a..aa10ad5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4905,16 +4905,31 @@ class DagRun(Base, LoggingMixin):
         dag = self.get_dag()
         tis = self.get_task_instances(session=session)
 
-        # check for removed tasks
+        # check for removed or restored tasks
         task_ids = []
         for ti in tis:
             task_ids.append(ti.task_id)
+            task = None
             try:
-                dag.get_task(ti.task_id)
+                task = dag.get_task(ti.task_id)
             except AirflowException:
-                if self.state is not State.RUNNING and not dag.partial:
+                if ti.state == State.REMOVED:
+                    pass  # ti has already been removed, just ignore it
+                elif self.state is not State.RUNNING and not dag.partial:
+                    self.log.warning("Failed to get task '{}' for dag '{}'. "
+                                     "Marking it as removed.".format(ti, dag))
+                    Stats.incr(
+                        "task_removed_from_dag.{}".format(dag.dag_id), 1, 1)
                     ti.state = State.REMOVED
 
+            is_task_in_dag = task is not None
+            should_restore_task = is_task_in_dag and ti.state == State.REMOVED
+            if should_restore_task:
+                self.log.info("Restoring task '{}' which was previously "
+                              "removed from DAG '{}'".format(ti, dag))
+                Stats.incr("task_restored_to_dag.{}".format(dag.dag_id), 1, 1)
+                ti.state = State.NONE
+
         # check for missing tasks
         for task in six.itervalues(dag.task_dict):
             if task.adhoc:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8c42d03c/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 5d8184c..98913af 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -892,6 +892,30 @@ class DagRunTest(unittest.TestCase):
         self.assertTrue(dagrun.is_backfill)
         self.assertFalse(dagrun2.is_backfill)
 
+    def test_removed_task_instances_can_be_restored(self):
+        def with_all_tasks_removed(dag):
+            return DAG(dag_id=dag.dag_id, start_date=dag.start_date)
+
+        dag = DAG('test_task_restoration', start_date=DEFAULT_DATE)
+        dag.add_task(DummyOperator(task_id='flaky_task', owner='test'))
+
+        dagrun = self.create_dag_run(dag)
+        flaky_ti = dagrun.get_task_instances()[0]
+        self.assertEquals('flaky_task', flaky_ti.task_id)
+        self.assertEquals(State.NONE, flaky_ti.state)
+
+        dagrun.dag = with_all_tasks_removed(dag)
+
+        dagrun.verify_integrity()
+        flaky_ti.refresh_from_db()
+        self.assertEquals(State.REMOVED, flaky_ti.state)
+
+        dagrun.dag.add_task(DummyOperator(task_id='flaky_task', owner='test'))
+
+        dagrun.verify_integrity()
+        flaky_ti.refresh_from_db()
+        self.assertEquals(State.NONE, flaky_ti.state)
+
 class DagBagTest(unittest.TestCase):
 
     def test_get_existing_dag(self):