You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/14 23:37:20 UTC

incubator-airflow git commit: [AIRFLOW-1059] Reset orphaned tasks in batch for scheduler

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 28aeed4aa -> e05d3b4df


[AIRFLOW-1059] Reset orphaned tasks in batch for scheduler

The current implementation resets state for tasks
1 dagrun at a time. We
should be able to do this in larger batches, which
will improve
scheduler startup time.

Closes #2205 from saguziel/aguziel-reset-state


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e05d3b4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e05d3b4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e05d3b4d

Branch: refs/heads/master
Commit: e05d3b4df0013f0cff804dfbd1db0197f320de48
Parents: 28aeed4
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Jul 14 16:37:13 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Fri Jul 14 16:37:13 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py |  91 +++++++++++++++-------
 tests/jobs.py   | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 277 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f8ab1fa..e8431b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,7 +35,8 @@ import time
 from time import sleep
 
 import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_
+from sqlalchemy import (
+    Column, Integer, String, DateTime, func, Index, or_, and_, not_)
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
@@ -212,27 +213,73 @@ class BaseJob(Base, LoggingMixin):
         raise NotImplementedError("This method needs to be overridden")
 
     @provide_session
-    def reset_state_for_orphaned_tasks(self, dag_run, session=None):
+    def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
         """
-        This function checks for a DagRun if there are any tasks
+        This function checks if there are any tasks in the dagrun (or all)
         that have a scheduled state but are not known by the
         executor. If it finds those it will reset the state to None
         so they will get picked up again.
+        The batch option is for performance reasons as the queries are made in
+        sequence.
+
+        :param filter_by_dag_run: the dag_run we want to process, None if all
+        :type filter_by_dag_run: models.DagRun
+        :return: the TIs reset (in expired SQLAlchemy state)
+        :rtype: List(TaskInsance)
         """
         queued_tis = self.executor.queued_tasks
-
         # also consider running as the state might not have changed in the db yet
-        running = self.executor.running
-        tis = list()
-        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
-        tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
-
-        for ti in tis:
-            if ti.key not in queued_tis and ti.key not in running:
-                self.logger.debug("Rescheduling orphaned task {}".format(ti))
-                ti.state = State.NONE
+        running_tis = self.executor.running
+
+        resettable_states = [State.SCHEDULED, State.QUEUED]
+        TI = models.TaskInstance
+        DR = models.DagRun
+        if filter_by_dag_run is None:
+            resettable_tis = (
+                session
+                .query(TI)
+                .join(
+                    DR,
+                    and_(
+                        TI.dag_id == DR.dag_id,
+                        TI.execution_date == DR.execution_date))
+                .filter(
+                    DR.state == State.RUNNING,
+                    DR.external_trigger.is_(False),
+                    DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'),
+                    TI.state.in_(resettable_states))).all()
+        else:
+            resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states,
+                                                                  session=session)
+        tis_to_reset = []
+        # Can't use an update here since it doesn't support joins
+        for ti in resettable_tis:
+            if ti.key not in queued_tis and ti.key not in running_tis:
+                tis_to_reset.append(ti)
+
+        filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
+                                TI.task_id == ti.task_id,
+                                TI.execution_date == ti.execution_date)
+                           for ti in tis_to_reset])
+        if len(tis_to_reset) == 0:
+            return []
+        reset_tis = (
+            session
+            .query(TI)
+            .filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
+            .with_for_update()
+            .all())
+        for ti in reset_tis:
+            ti.state = State.NONE
+            session.merge(ti)
+        task_instance_str = '\n\t'.join(
+            ["{}".format(x) for x in reset_tis])
         session.commit()
 
+        self.logger.info("Reset the following {} TaskInstances:\n\t{}"
+                         .format(len(reset_tis), task_instance_str))
+        return reset_tis
+
 
 class DagFileProcessor(AbstractDagFileProcessor):
     """Helps call SchedulerJob.process_file() in a separate process."""
@@ -1354,19 +1401,8 @@ class SchedulerJob(BaseJob):
         self.executor.start()
 
         session = settings.Session()
-        self.logger.info("Resetting state for orphaned tasks")
-        # grab orphaned tasks and make sure to reset their state
-        active_runs = DagRun.find(
-            state=State.RUNNING,
-            external_trigger=False,
-            session=session,
-            no_backfills=True,
-        )
-        for dr in active_runs:
-            self.logger.info("Resetting {} {}".format(dr.dag_id,
-                                                      dr.execution_date))
-            self.reset_state_for_orphaned_tasks(dr, session=session)
-
+        self.logger.info("Resetting orphaned tasks for active dag runs")
+        self.reset_state_for_orphaned_tasks(session=session)
         session.close()
 
         execute_start_time = datetime.now()
@@ -1828,8 +1864,7 @@ class BackfillJob(BaseJob):
             run.run_id = run_id
             run.verify_integrity(session=session)
 
-            # check if we have orphaned tasks
-            self.reset_state_for_orphaned_tasks(dag_run=run, session=session)
+            self.reset_state_for_orphaned_tasks(filter_by_dag_run=run, session=session)
 
             # for some reason if we dont refresh the reference to run is lost
             run.refresh_from_db()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 6e6150b..13bd9f5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2028,3 +2028,217 @@ class SchedulerJobTest(unittest.TestCase):
         for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
             detected_files.append(file_path)
         self.assertEqual(sorted(detected_files), sorted(expected_files))
+        
+    def test_reset_orphaned_tasks_nothing(self):
+        """Try with nothing. """
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+        self.assertEqual(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+    def test_reset_orphaned_tasks_external_triggered_dag(self):
+        dag_id = 'test_reset_orphaned_tasks_external_triggered_dag'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag, session=session)
+        ti = dr1.get_task_instances(session=session)[0]
+        dr1.state = State.RUNNING
+        ti.state = State.SCHEDULED
+        dr1.external_trigger = True
+        session.merge(ti)
+        session.merge(dr1)
+        session.commit()
+
+        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+    def test_reset_orphaned_tasks_backfill_dag(self):
+        dag_id = 'test_reset_orphaned_tasks_backfill_dag'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag, session=session)
+        ti = dr1.get_task_instances(session=session)[0]
+        ti.state = State.SCHEDULED
+        dr1.state = State.RUNNING
+        dr1.run_id = BackfillJob.ID_PREFIX + '_sdfsfdfsd'
+        session.merge(ti)
+        session.merge(dr1)
+        session.commit()
+
+        self.assertTrue(dr1.is_backfill)
+        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+    def test_reset_orphaned_tasks_specified_dagrun(self):
+        """Try to reset when we specify a dagrun and ensure nothing else is."""
+        dag_id = 'test_reset_orphaned_tasks_specified_dagrun'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+        # make two dagruns, only reset for one
+        dr1 = scheduler.create_dag_run(dag)
+        dr2 = scheduler.create_dag_run(dag)
+        dr1.state = State.SUCCESS
+        dr2.state = State.RUNNING
+        ti1 = dr1.get_task_instances(session=session)[0]
+        ti2 = dr2.get_task_instances(session=session)[0]
+        ti1.state = State.SCHEDULED
+        ti2.state = State.SCHEDULED
+
+        session.merge(ti1)
+        session.merge(ti2)
+        session.merge(dr1)
+        session.merge(dr2)
+        session.commit()
+
+        reset_tis = scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session)
+        self.assertEquals(1, len(reset_tis))
+        ti1.refresh_from_db(session=session)
+        ti2.refresh_from_db(session=session)
+        self.assertEquals(State.SCHEDULED, ti1.state)
+        self.assertEquals(State.NONE, ti2.state)
+
+    def test_reset_orphaned_tasks_nonexistent_dagrun(self):
+        """Make sure a task in an orphaned state is not reset if it has no dagrun. """
+        dag_id = 'test_reset_orphaned_tasks_nonexistent_dagrun'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        session.add(ti)
+        session.commit()
+
+        ti.refresh_from_db()
+        ti.state = State.SCHEDULED
+        session.merge(ti)
+        session.commit()
+
+        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+    def test_reset_orphaned_tasks_no_orphans(self):
+        dag_id = 'test_reset_orphaned_tasks_no_orphans'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        dr1.state = State.RUNNING
+        tis = dr1.get_task_instances(session=session)
+        tis[0].state = State.RUNNING
+        session.merge(dr1)
+        session.merge(tis[0])
+        session.commit()
+
+        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+        tis[0].refresh_from_db()
+        self.assertEquals(State.RUNNING, tis[0].state)
+
+    def test_reset_orphaned_tasks_non_running_dagruns(self):
+        """Ensure orphaned tasks with non-running dagruns are not reset."""
+        dag_id = 'test_reset_orphaned_tasks_non_running_dagruns'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+        task_id = dag_id + '_task'
+        task = DummyOperator(task_id=task_id, dag=dag)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        dr1 = scheduler.create_dag_run(dag)
+        dr1.state = State.SUCCESS
+        tis = dr1.get_task_instances(session=session)
+        self.assertEquals(1, len(tis))
+        tis[0].state = State.SCHEDULED
+        session.merge(dr1)
+        session.merge(tis[0])
+        session.commit()
+
+        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+    def test_reset_orphaned_tasks_with_orphans(self):
+        """Create dagruns and esnure only ones with correct states are reset."""
+        prefix = 'scheduler_job_test_test_reset_orphaned_tasks'
+        states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS]
+        states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE]
+
+        dag = DAG(dag_id=prefix,
+                  start_date=DEFAULT_DATE,
+                  schedule_interval="@daily")
+        tasks = []
+        for i in range(len(states)):
+            task_id = "{}_task_{}".format(prefix, i)
+            task = DummyOperator(task_id=task_id, dag=dag)
+            tasks.append(task)
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+
+        session = settings.Session()
+
+        # create dagruns
+        dr1 = scheduler.create_dag_run(dag)
+        dr2 = scheduler.create_dag_run(dag)
+        dr1.state = State.RUNNING
+        dr2.state = State.SUCCESS
+        session.merge(dr1)
+        session.merge(dr2)
+        session.commit()
+
+        # create taskinstances and set states
+        dr1_tis = []
+        dr2_tis = []
+        for i, (task, state) in enumerate(zip(tasks, states)):
+            ti1 = TI(task, dr1.execution_date)
+            ti2 = TI(task, dr2.execution_date)
+            ti1.refresh_from_db()
+            ti2.refresh_from_db()
+            ti1.state = state
+            ti2.state = state
+            dr1_tis.append(ti1)
+            dr2_tis.append(ti2)
+            session.merge(ti1)
+            session.merge(ti2)
+            session.commit()
+
+        self.assertEqual(2, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+        for ti in dr1_tis + dr2_tis:
+            ti.refresh_from_db()
+
+        # running dagrun should be reset
+        for state, ti in zip(states, dr1_tis):
+            if state in states_to_reset:
+                self.assertIsNone(ti.state)
+            else:
+                self.assertEqual(state, ti.state)
+
+        # otherwise not
+        for state, ti in zip(states, dr2_tis):
+            self.assertEqual(state, ti.state)
+
+        for state, ti in zip(states, dr1_tis):
+            ti.state = state
+        session.commit()
+
+        scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session)
+
+        # check same for dag_run version
+        for state, ti in zip(states, dr2_tis):
+            self.assertEqual(state, ti.state)
+
+        session.close()