You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/14 23:37:20 UTC
incubator-airflow git commit: [AIRFLOW-1059] Reset orphaned tasks in
batch for scheduler
Repository: incubator-airflow
Updated Branches:
refs/heads/master 28aeed4aa -> e05d3b4df
[AIRFLOW-1059] Reset orphaned tasks in batch for scheduler
The current implementation resets state for tasks
1 dagrun at a time. We
should be able to do this in larger batches, which
will improve
scheduler startup time.
Closes #2205 from saguziel/aguziel-reset-state
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e05d3b4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e05d3b4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e05d3b4d
Branch: refs/heads/master
Commit: e05d3b4df0013f0cff804dfbd1db0197f320de48
Parents: 28aeed4
Author: Alex Guziel <al...@airbnb.com>
Authored: Fri Jul 14 16:37:13 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Fri Jul 14 16:37:13 2017 -0700
----------------------------------------------------------------------
airflow/jobs.py | 91 +++++++++++++++-------
tests/jobs.py | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 277 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f8ab1fa..e8431b7 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -35,7 +35,8 @@ import time
from time import sleep
import psutil
-from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_, and_, not_
+from sqlalchemy import (
+ Column, Integer, String, DateTime, func, Index, or_, and_, not_)
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
@@ -212,27 +213,73 @@ class BaseJob(Base, LoggingMixin):
raise NotImplementedError("This method needs to be overridden")
@provide_session
- def reset_state_for_orphaned_tasks(self, dag_run, session=None):
+ def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
"""
- This function checks for a DagRun if there are any tasks
+ This function checks if there are any tasks in the dagrun (or all)
that have a scheduled state but are not known by the
executor. If it finds those it will reset the state to None
so they will get picked up again.
+ The batch option is for performance reasons as the queries are made in
+ sequence.
+
+ :param filter_by_dag_run: the dag_run we want to process, None if all
+ :type filter_by_dag_run: models.DagRun
+ :return: the TIs reset (in expired SQLAlchemy state)
+ :rtype: List(TaskInsance)
"""
queued_tis = self.executor.queued_tasks
-
# also consider running as the state might not have changed in the db yet
- running = self.executor.running
- tis = list()
- tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
- tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
-
- for ti in tis:
- if ti.key not in queued_tis and ti.key not in running:
- self.logger.debug("Rescheduling orphaned task {}".format(ti))
- ti.state = State.NONE
+ running_tis = self.executor.running
+
+ resettable_states = [State.SCHEDULED, State.QUEUED]
+ TI = models.TaskInstance
+ DR = models.DagRun
+ if filter_by_dag_run is None:
+ resettable_tis = (
+ session
+ .query(TI)
+ .join(
+ DR,
+ and_(
+ TI.dag_id == DR.dag_id,
+ TI.execution_date == DR.execution_date))
+ .filter(
+ DR.state == State.RUNNING,
+ DR.external_trigger.is_(False),
+ DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'),
+ TI.state.in_(resettable_states))).all()
+ else:
+ resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states,
+ session=session)
+ tis_to_reset = []
+ # Can't use an update here since it doesn't support joins
+ for ti in resettable_tis:
+ if ti.key not in queued_tis and ti.key not in running_tis:
+ tis_to_reset.append(ti)
+
+ filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
+ TI.task_id == ti.task_id,
+ TI.execution_date == ti.execution_date)
+ for ti in tis_to_reset])
+ if len(tis_to_reset) == 0:
+ return []
+ reset_tis = (
+ session
+ .query(TI)
+ .filter(or_(*filter_for_tis), TI.state.in_(resettable_states))
+ .with_for_update()
+ .all())
+ for ti in reset_tis:
+ ti.state = State.NONE
+ session.merge(ti)
+ task_instance_str = '\n\t'.join(
+ ["{}".format(x) for x in reset_tis])
session.commit()
+ self.logger.info("Reset the following {} TaskInstances:\n\t{}"
+ .format(len(reset_tis), task_instance_str))
+ return reset_tis
+
class DagFileProcessor(AbstractDagFileProcessor):
"""Helps call SchedulerJob.process_file() in a separate process."""
@@ -1354,19 +1401,8 @@ class SchedulerJob(BaseJob):
self.executor.start()
session = settings.Session()
- self.logger.info("Resetting state for orphaned tasks")
- # grab orphaned tasks and make sure to reset their state
- active_runs = DagRun.find(
- state=State.RUNNING,
- external_trigger=False,
- session=session,
- no_backfills=True,
- )
- for dr in active_runs:
- self.logger.info("Resetting {} {}".format(dr.dag_id,
- dr.execution_date))
- self.reset_state_for_orphaned_tasks(dr, session=session)
-
+ self.logger.info("Resetting orphaned tasks for active dag runs")
+ self.reset_state_for_orphaned_tasks(session=session)
session.close()
execute_start_time = datetime.now()
@@ -1828,8 +1864,7 @@ class BackfillJob(BaseJob):
run.run_id = run_id
run.verify_integrity(session=session)
- # check if we have orphaned tasks
- self.reset_state_for_orphaned_tasks(dag_run=run, session=session)
+ self.reset_state_for_orphaned_tasks(filter_by_dag_run=run, session=session)
# for some reason if we dont refresh the reference to run is lost
run.refresh_from_db()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e05d3b4d/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 6e6150b..13bd9f5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2028,3 +2028,217 @@ class SchedulerJobTest(unittest.TestCase):
for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
detected_files.append(file_path)
self.assertEqual(sorted(detected_files), sorted(expected_files))
+
+ def test_reset_orphaned_tasks_nothing(self):
+ """Try with nothing. """
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+ self.assertEqual(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ def test_reset_orphaned_tasks_external_triggered_dag(self):
+ dag_id = 'test_reset_orphaned_tasks_external_triggered_dag'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+
+ dr1 = scheduler.create_dag_run(dag, session=session)
+ ti = dr1.get_task_instances(session=session)[0]
+ dr1.state = State.RUNNING
+ ti.state = State.SCHEDULED
+ dr1.external_trigger = True
+ session.merge(ti)
+ session.merge(dr1)
+ session.commit()
+
+ self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ def test_reset_orphaned_tasks_backfill_dag(self):
+ dag_id = 'test_reset_orphaned_tasks_backfill_dag'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+
+ dr1 = scheduler.create_dag_run(dag, session=session)
+ ti = dr1.get_task_instances(session=session)[0]
+ ti.state = State.SCHEDULED
+ dr1.state = State.RUNNING
+ dr1.run_id = BackfillJob.ID_PREFIX + '_sdfsfdfsd'
+ session.merge(ti)
+ session.merge(dr1)
+ session.commit()
+
+ self.assertTrue(dr1.is_backfill)
+ self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ def test_reset_orphaned_tasks_specified_dagrun(self):
+ """Try to reset when we specify a dagrun and ensure nothing else is."""
+ dag_id = 'test_reset_orphaned_tasks_specified_dagrun'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+ # make two dagruns, only reset for one
+ dr1 = scheduler.create_dag_run(dag)
+ dr2 = scheduler.create_dag_run(dag)
+ dr1.state = State.SUCCESS
+ dr2.state = State.RUNNING
+ ti1 = dr1.get_task_instances(session=session)[0]
+ ti2 = dr2.get_task_instances(session=session)[0]
+ ti1.state = State.SCHEDULED
+ ti2.state = State.SCHEDULED
+
+ session.merge(ti1)
+ session.merge(ti2)
+ session.merge(dr1)
+ session.merge(dr2)
+ session.commit()
+
+ reset_tis = scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session)
+ self.assertEquals(1, len(reset_tis))
+ ti1.refresh_from_db(session=session)
+ ti2.refresh_from_db(session=session)
+ self.assertEquals(State.SCHEDULED, ti1.state)
+ self.assertEquals(State.NONE, ti2.state)
+
+ def test_reset_orphaned_tasks_nonexistent_dagrun(self):
+ """Make sure a task in an orphaned state is not reset if it has no dagrun. """
+ dag_id = 'test_reset_orphaned_tasks_nonexistent_dagrun'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+
+ ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ session.add(ti)
+ session.commit()
+
+ ti.refresh_from_db()
+ ti.state = State.SCHEDULED
+ session.merge(ti)
+ session.commit()
+
+ self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ def test_reset_orphaned_tasks_no_orphans(self):
+ dag_id = 'test_reset_orphaned_tasks_no_orphans'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+
+ dr1 = scheduler.create_dag_run(dag)
+ dr1.state = State.RUNNING
+ tis = dr1.get_task_instances(session=session)
+ tis[0].state = State.RUNNING
+ session.merge(dr1)
+ session.merge(tis[0])
+ session.commit()
+
+ self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+ tis[0].refresh_from_db()
+ self.assertEquals(State.RUNNING, tis[0].state)
+
+ def test_reset_orphaned_tasks_non_running_dagruns(self):
+ """Ensure orphaned tasks with non-running dagruns are not reset."""
+ dag_id = 'test_reset_orphaned_tasks_non_running_dagruns'
+ dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
+ task_id = dag_id + '_task'
+ task = DummyOperator(task_id=task_id, dag=dag)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+ session = settings.Session()
+
+ dr1 = scheduler.create_dag_run(dag)
+ dr1.state = State.SUCCESS
+ tis = dr1.get_task_instances(session=session)
+ self.assertEquals(1, len(tis))
+ tis[0].state = State.SCHEDULED
+ session.merge(dr1)
+ session.merge(tis[0])
+ session.commit()
+
+ self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ def test_reset_orphaned_tasks_with_orphans(self):
+ """Create dagruns and esnure only ones with correct states are reset."""
+ prefix = 'scheduler_job_test_test_reset_orphaned_tasks'
+ states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS]
+ states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE]
+
+ dag = DAG(dag_id=prefix,
+ start_date=DEFAULT_DATE,
+ schedule_interval="@daily")
+ tasks = []
+ for i in range(len(states)):
+ task_id = "{}_task_{}".format(prefix, i)
+ task = DummyOperator(task_id=task_id, dag=dag)
+ tasks.append(task)
+
+ scheduler = SchedulerJob(**self.default_scheduler_args)
+
+ session = settings.Session()
+
+ # create dagruns
+ dr1 = scheduler.create_dag_run(dag)
+ dr2 = scheduler.create_dag_run(dag)
+ dr1.state = State.RUNNING
+ dr2.state = State.SUCCESS
+ session.merge(dr1)
+ session.merge(dr2)
+ session.commit()
+
+ # create taskinstances and set states
+ dr1_tis = []
+ dr2_tis = []
+ for i, (task, state) in enumerate(zip(tasks, states)):
+ ti1 = TI(task, dr1.execution_date)
+ ti2 = TI(task, dr2.execution_date)
+ ti1.refresh_from_db()
+ ti2.refresh_from_db()
+ ti1.state = state
+ ti2.state = state
+ dr1_tis.append(ti1)
+ dr2_tis.append(ti2)
+ session.merge(ti1)
+ session.merge(ti2)
+ session.commit()
+
+ self.assertEqual(2, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+
+ for ti in dr1_tis + dr2_tis:
+ ti.refresh_from_db()
+
+ # running dagrun should be reset
+ for state, ti in zip(states, dr1_tis):
+ if state in states_to_reset:
+ self.assertIsNone(ti.state)
+ else:
+ self.assertEqual(state, ti.state)
+
+ # otherwise not
+ for state, ti in zip(states, dr2_tis):
+ self.assertEqual(state, ti.state)
+
+ for state, ti in zip(states, dr1_tis):
+ ti.state = state
+ session.commit()
+
+ scheduler.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session)
+
+ # check same for dag_run version
+ for state, ti in zip(states, dr2_tis):
+ self.assertEqual(state, ti.state)
+
+ session.close()