You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/08/12 12:49:17 UTC

incubator-airflow git commit: [AIRFLOW-69] Use dag runs in backfill jobs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 785dd1901 -> c39e4f6aa


[AIRFLOW-69] Use dag runs in backfill jobs

Backfill jobs create taskinstances without any associated
DagRuns. This creates consistency errors. This patch addresses
this issue and also makes the scheduler backfill aware.

The scheduler makes sure to schedule new dag runs after the
last dag run including backfills. It will not pick up
any tasks that are part of a backfill as those are considered
to be managed by the backfill process. This can be (and should
be) changed when backfill are running in a scheduled fashion.

It doesn't deal with the remaining issue that backfills can be
scheduled on top of existing dag runs and that due to this
TaskInstances can point to multiple DagRuns.

Closes #1667 from bolkedebruin/backfill_dagrun


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c39e4f6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c39e4f6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c39e4f6a

Branch: refs/heads/master
Commit: c39e4f6aa45d0f7581410e9c0f42ab56cbf30785
Parents: 785dd19
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Aug 12 08:48:36 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Fri Aug 12 08:48:36 2016 -0400

----------------------------------------------------------------------
 airflow/jobs.py                    | 376 +++++++++++++++++++-------------
 airflow/models.py                  |  37 ++++
 tests/jobs.py                      |  69 +++++-
 tests/operators/subdag_operator.py |   1 -
 4 files changed, 327 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c39e4f6a/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 9580435..944d847 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -729,6 +729,12 @@ class SchedulerJob(BaseJob):
             else:
                 next_run_date = dag.following_schedule(last_scheduled_run)
 
+            # make sure backfills are also considered
+            last_run = dag.get_last_dagrun(session=session)
+            if last_run and next_run_date:
+                while next_run_date <= last_run.execution_date:
+                    next_run_date = dag.following_schedule(next_run_date)
+
             # don't ever schedule prior to the dag's start_date
             if dag.start_date:
                 next_run_date = (dag.start_date if not next_run_date
@@ -789,6 +795,10 @@ class SchedulerJob(BaseJob):
                                    .format(run.execution_date))
                 continue
 
+            # skip backfill dagruns for now as long as they are not really scheduled
+            if run.is_backfill:
+                continue
+
             # todo: run.dag is transient but needs to be set
             run.dag = dag
             # todo: preferably the integrity check happens at dag collection time
@@ -968,6 +978,11 @@ class SchedulerJob(BaseJob):
                                      .format(task_instance, task_instance.dag_id))
                     continue
 
+                # todo: remove this logic when backfills will be part of the scheduler
+                dag_run = task_instance.get_dagrun()
+                if dag_run and dag_run.is_backfill:
+                    continue
+
                 # Check to make sure that the task concurrency of the DAG hasn't been
                 # reached.
                 dag_id = task_instance.dag_id
@@ -1571,6 +1586,23 @@ class BackfillJob(BaseJob):
         Runs a dag for a specified date range.
         """
         session = settings.Session()
+        DagRun = models.DagRun
+
+        # consider max_active_runs but ignore when running subdags
+        # "parent.child" as a dag_id is by convention a subdag
+        if self.dag.schedule_interval and not "." in self.dag.dag_id:
+            active_runs = DagRun.find(
+                dag_id=self.dag.dag_id,
+                state=State.RUNNING,
+                external_trigger=False,
+                session=session
+            )
+
+            # return if already reached maximum active runs
+            if len(active_runs) >= self.dag.max_active_runs:
+                self.logger.info("Dag {} has reached maximum amount of {} dag runs"
+                                 .format(self.dag.dag_id, self.dag.max_active_runs))
+                return
 
         start_date = self.bf_start_date
         end_date = self.bf_end_date
@@ -1597,177 +1629,225 @@ class BackfillJob(BaseJob):
         not_ready = set()
         deadlocked = set()
 
-        for task in self.dag.tasks:
-            if (not self.include_adhoc) and task.adhoc:
-                continue
+        # create dag runs
+        dr_start_date = start_date or min([t.start_date for t in self.dag.tasks])
+        next_run_date = self.dag.normalize_schedule(dr_start_date)
+        end_date = end_date or datetime.now()
 
-            start_date = start_date or task.start_date
-            end_date = end_date or task.end_date or datetime.now()
-            for dttm in self.dag.date_range(start_date, end_date=end_date):
-                ti = models.TaskInstance(task, dttm)
-                tasks_to_run[ti.key] = ti
-                session.merge(ti)
-        session.commit()
+        active_dag_runs = []
+        while next_run_date and next_run_date <= end_date:
+            run_id = 'backfill_' + next_run_date.isoformat()
+
+            # check if we are scheduling on top of a already existing dag_run
+            # we could find a "scheduled" run instead of a "backfill"
+            run = DagRun.find(dag_id=self.dag.dag_id,
+                              execution_date=next_run_date,
+                              session=session)
+            if not run:
+                run = self.dag.create_dagrun(
+                    run_id=run_id,
+                    execution_date=next_run_date,
+                    start_date=datetime.now(),
+                    state=State.RUNNING,
+                    external_trigger=False,
+                    session=session,
+                )
+            else:
+                run = run[0]
 
-        # Triggering what is ready to get triggered
-        while tasks_to_run and not deadlocked:
-            not_ready.clear()
-            for key, ti in list(tasks_to_run.items()):
-                ti.refresh_from_db(session=session, lock_for_update=True)
-                ignore_depends_on_past = (
-                    self.ignore_first_depends_on_past and
-                    ti.execution_date == (start_date or ti.start_date))
-                # The task was already marked successful or skipped by a
-                # different Job. Don't rerun it.
-                if ti.state == State.SUCCESS:
-                    succeeded.add(key)
-                    tasks_to_run.pop(key)
-                    session.commit()
-                    continue
-                elif ti.state == State.SKIPPED:
-                    skipped.add(key)
-                    tasks_to_run.pop(key)
-                    session.commit()
-                    continue
+            # set required transient field
+            run.dag = self.dag
 
-                # Is the task runnable? -- then run it
-                if ti.is_queueable(
-                        include_queued=True,
-                        ignore_depends_on_past=ignore_depends_on_past,
-                        flag_upstream_failed=True):
-                    self.logger.debug('Sending {} to executor'.format(ti))
-                    if ti.state == State.NONE:
-                        ti.state = State.SCHEDULED
-                        session.merge(ti)
-                    session.commit()
-                    executor.queue_task_instance(
-                        ti,
-                        mark_success=self.mark_success,
-                        pickle_id=pickle_id,
-                        ignore_dependencies=self.ignore_dependencies,
-                        ignore_depends_on_past=ignore_depends_on_past,
-                        pool=self.pool)
-                    started.add(key)
-
-                # Mark the task as not ready to run
-                elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
-                    not_ready.add(key)
+            # explictely mark running as we can fill gaps
+            run.state = State.RUNNING
+            run.verify_integrity(session=session)
 
-                session.commit()
+            # for some reason if we dont refresh the reference to run is lost
+            run.refresh_from_db()
+            make_transient(run)
+            active_dag_runs.append(run)
 
-            self.heartbeat()
-            executor.heartbeat()
-
-            # If the set of tasks that aren't ready ever equals the set of
-            # tasks to run, then the backfill is deadlocked
-            if not_ready and not_ready == set(tasks_to_run):
-                deadlocked.update(tasks_to_run.values())
-                tasks_to_run.clear()
-
-            # Reacting to events
-            for key, state in list(executor.get_event_buffer().items()):
-                dag_id, task_id, execution_date = key
-                if key not in tasks_to_run:
-                    continue
-                ti = tasks_to_run[key]
-                ti.refresh_from_db()
-
-                # executor reports failure
-                if state == State.FAILED:
-
-                    # task reports running
-                    if ti.state == State.RUNNING:
-                        msg = (
-                            'Executor reports that task instance {} failed '
-                            'although the task says it is running.'.format(key))
-                        self.logger.error(msg)
-                        ti.handle_failure(msg)
-                        tasks_to_run.pop(key)
+            next_run_date = self.dag.following_schedule(next_run_date)
 
-                    # task reports skipped
-                    elif ti.state == State.SKIPPED:
-                        self.logger.error("Skipping {} ".format(key))
-                        skipped.add(key)
-                        tasks_to_run.pop(key)
+        run_count = 0
+        for run in active_dag_runs:
+            logging.info("Checking run {}".format(run))
+            run_count = run_count + 1
+            # this needs a fresh session sometimes tis get detached
+            # can be more finegrained (excluding success or skipped)
+            for ti in run.get_task_instances():
+                tasks_to_run[ti.key] = ti
 
-                    # anything else is a failure
-                    else:
-                        self.logger.error("Task instance {} failed".format(key))
-                        failed.add(key)
-                        tasks_to_run.pop(key)
+            # Triggering what is ready to get triggered
+            while tasks_to_run and not deadlocked:
+                not_ready.clear()
 
-                # executor reports success
-                elif state == State.SUCCESS:
+                for key, ti in list(tasks_to_run.items()):
+                    ti.refresh_from_db(session=session, lock_for_update=True)
+                    task = self.dag.get_task(ti.task_id)
+                    ti.task = task
 
-                    # task reports success
+                    ignore_depends_on_past = (
+                        self.ignore_first_depends_on_past and
+                        ti.execution_date == (start_date or ti.start_date))
+                    # The task was already marked successful or skipped by a
+                    # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
-                        self.logger.info(
-                            'Task instance {} succeeded'.format(key))
                         succeeded.add(key)
                         tasks_to_run.pop(key)
-
-                    # task reports failure
-                    elif ti.state == State.FAILED:
-                        self.logger.error("Task instance {} failed".format(key))
-                        failed.add(key)
-                        tasks_to_run.pop(key)
-
-                    # task reports skipped
+                        session.commit()
+                        continue
                     elif ti.state == State.SKIPPED:
-                        self.logger.info("Task instance {} skipped".format(key))
                         skipped.add(key)
                         tasks_to_run.pop(key)
+                        session.commit()
+                        continue
+
+                    # Is the task runnable? -- then run it
+                    if ti.is_queueable(
+                            include_queued=True,
+                            ignore_depends_on_past=ignore_depends_on_past,
+                            flag_upstream_failed=True):
+                        self.logger.debug('Sending {} to executor'.format(ti))
+                        if ti.state == State.NONE:
+                            ti.state = State.SCHEDULED
+                            session.merge(ti)
+                        session.commit()
+                        executor.queue_task_instance(
+                            ti,
+                            mark_success=self.mark_success,
+                            pickle_id=pickle_id,
+                            ignore_dependencies=self.ignore_dependencies,
+                            ignore_depends_on_past=ignore_depends_on_past,
+                            pool=self.pool)
+                        started.add(key)
+
+                    # Mark the task as not ready to run
+                    elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
+                        not_ready.add(key)
 
-                    # this probably won't ever be triggered
-                    elif ti in not_ready:
-                        self.logger.info(
-                            "{} wasn't expected to run, but it did".format(ti))
-
-                    # executor reports success but task does not - this is weird
-                    elif ti.state not in (
-                            State.SCHEDULED,
-                            State.QUEUED,
-                            State.UP_FOR_RETRY):
-                        self.logger.error(
-                            "The airflow run command failed "
-                            "at reporting an error. This should not occur "
-                            "in normal circumstances. Task state is '{}',"
-                            "reported state is '{}'. TI is {}"
-                            "".format(ti.state, state, ti))
-
-                        # if the executor fails 3 or more times, stop trying to
-                        # run the task
-                        executor_fails[key] += 1
-                        if executor_fails[key] >= 3:
+                    session.commit()
+
+                self.heartbeat()
+                executor.heartbeat()
+
+                # If the set of tasks that aren't ready ever equals the set of
+                # tasks to run, then the backfill is deadlocked
+                if not_ready and not_ready == set(tasks_to_run):
+                    deadlocked.update(tasks_to_run.values())
+                    tasks_to_run.clear()
+
+                # Reacting to events
+                for key, state in list(executor.get_event_buffer().items()):
+                    if key not in tasks_to_run:
+                        continue
+                    ti = tasks_to_run[key]
+                    ti.refresh_from_db()
+                    logging.info("Executor state: {} task {}".format(state, ti))
+                    # executor reports failure
+                    if state == State.FAILED:
+
+                        # task reports running
+                        if ti.state == State.RUNNING:
                             msg = (
-                                'The airflow run command failed to report an '
-                                'error for task {} three or more times. The '
-                                'task is being marked as failed. This is very '
-                                'unusual and probably means that an error is '
-                                'taking place before the task even '
-                                'starts.'.format(key))
+                                'Executor reports that task instance {} failed '
+                                'although the task says it is running.'.format(key))
                             self.logger.error(msg)
                             ti.handle_failure(msg)
                             tasks_to_run.pop(key)
 
-            msg = ' | '.join([
-                "[backfill progress]",
-                "waiting: {0}",
-                "succeeded: {1}",
-                "kicked_off: {2}",
-                "failed: {3}",
-                "skipped: {4}",
-                "deadlocked: {5}"
-            ]).format(
-                len(tasks_to_run),
-                len(succeeded),
-                len(started),
-                len(failed),
-                len(skipped),
-                len(deadlocked))
-            self.logger.info(msg)
+                        # task reports skipped
+                        elif ti.state == State.SKIPPED:
+                            self.logger.error("Skipping {} ".format(key))
+                            skipped.add(key)
+                            tasks_to_run.pop(key)
+
+                        # anything else is a failure
+                        else:
+                            self.logger.error("Task instance {} failed".format(key))
+                            failed.add(key)
+                            tasks_to_run.pop(key)
+
+                    # executor reports success
+                    elif state == State.SUCCESS:
+
+                        # task reports success
+                        if ti.state == State.SUCCESS:
+                            self.logger.info(
+                                'Task instance {} succeeded'.format(key))
+                            succeeded.add(key)
+                            tasks_to_run.pop(key)
+
+                        # task reports failure
+                        elif ti.state == State.FAILED:
+                            self.logger.error("Task instance {} failed".format(key))
+                            failed.add(key)
+                            tasks_to_run.pop(key)
+
+                        # task reports skipped
+                        elif ti.state == State.SKIPPED:
+                            self.logger.info("Task instance {} skipped".format(key))
+                            skipped.add(key)
+                            tasks_to_run.pop(key)
+
+                        # this probably won't ever be triggered
+                        elif ti in not_ready:
+                            self.logger.info(
+                                "{} wasn't expected to run, but it did".format(ti))
+
+                        # executor reports success but task does not - this is weird
+                        elif ti.state not in (
+                                State.SCHEDULED,
+                                State.QUEUED,
+                                State.UP_FOR_RETRY):
+                            self.logger.error(
+                                "The airflow run command failed "
+                                "at reporting an error. This should not occur "
+                                "in normal circumstances. Task state is '{}',"
+                                "reported state is '{}'. TI is {}"
+                                "".format(ti.state, state, ti))
+
+                            # if the executor fails 3 or more times, stop trying to
+                            # run the task
+                            executor_fails[key] += 1
+                            if executor_fails[key] >= 3:
+                                msg = (
+                                    'The airflow run command failed to report an '
+                                    'error for task {} three or more times. The '
+                                    'task is being marked as failed. This is very '
+                                    'unusual and probably means that an error is '
+                                    'taking place before the task even '
+                                    'starts.'.format(key))
+                                self.logger.error(msg)
+                                ti.handle_failure(msg)
+                                tasks_to_run.pop(key)
+
+                msg = ' | '.join([
+                    "[backfill progress]",
+                    "dag run {6} of {7}",
+                    "tasks waiting: {0}",
+                    "succeeded: {1}",
+                    "kicked_off: {2}",
+                    "failed: {3}",
+                    "skipped: {4}",
+                    "deadlocked: {5}"
+                ]).format(
+                    len(tasks_to_run),
+                    len(succeeded),
+                    len(started),
+                    len(failed),
+                    len(skipped),
+                    len(deadlocked),
+                    run_count,
+                    len(active_dag_runs))
+                self.logger.info(msg)
+
+            # update dag run state
+            run.update_state(session=session)
 
         executor.end()
+
+        session.commit()
         session.close()
 
         err = ''

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c39e4f6a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f4d7afd..013e0c5 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1243,6 +1243,21 @@ class TaskInstance(Base):
         return open_slots <= 0
 
     @provide_session
+    def get_dagrun(self, session):
+        """
+        Returns the DagRun for this TaskInstance
+        :param session:
+        :return: DagRun
+        """
+        dr = session.query(DagRun).filter(
+            DagRun.dag_id==self.dag_id,
+            DagRun.execution_date==self.execution_date,
+            DagRun.start_date==self.start_date
+        ).first()
+
+        return dr
+
+    @provide_session
     def run(
             self,
             verbose=True,
@@ -2757,6 +2772,21 @@ class DAG(BaseDag, LoggingMixin):
 
         return dttm
 
+    @provide_session
+    def get_last_dagrun(self, session=None):
+        """
+        Returns the last dag run for this dag, None if there was none.
+        Last dag run can be any type of run eg. scheduled or backfilled.
+        Overriden DagRuns are ignored
+        """
+        DR = DagRun
+        last = session.query(DR).filter(
+            DR.dag_id == self.dag_id,
+            DR.external_trigger == False
+        ).order_by(DR.execution_date.desc()).first()
+
+        return last
+
     @property
     def dag_id(self):
         return self._dag_id
@@ -3882,6 +3912,13 @@ class DagRun(Base):
         )
         return qry.first()
 
+    @property
+    def is_backfill(self):
+        if "backfill" in self.run_id:
+            return True
+
+        return False
+
 
 class Pool(Base):
     __tablename__ = "slot_pool"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c39e4f6a/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 351268a..7a8a57e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -57,6 +57,64 @@ class BackfillJobTest(unittest.TestCase):
         self.parser = cli.CLIFactory.get_parser()
         self.dagbag = DagBag(include_examples=True)
 
+    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+                     "concurrent access not supported in sqlite")
+    def test_trigger_controller_dag(self):
+        dag = self.dagbag.get_dag('example_trigger_controller_dag')
+        target_dag = self.dagbag.get_dag('example_trigger_target_dag')
+        dag.clear()
+        target_dag.clear()
+
+        scheduler = SchedulerJob()
+        queue = mock.Mock()
+        scheduler._process_task_instances(target_dag, queue=queue)
+        self.assertFalse(queue.append.called)
+
+        job = BackfillJob(
+            dag=dag,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            ignore_first_depends_on_past=True
+        )
+        job.run()
+
+        scheduler = SchedulerJob()
+        queue = mock.Mock()
+        scheduler._process_task_instances(target_dag, queue=queue)
+
+        self.assertTrue(queue.append.called)
+        target_dag.clear()
+        dag.clear()
+
+    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+                     "concurrent access not supported in sqlite")
+    def test_backfill_multi_dates(self):
+        dag = self.dagbag.get_dag('example_bash_operator')
+        dag.clear()
+
+        job = BackfillJob(
+            dag=dag,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE+datetime.timedelta(days=1),
+            ignore_first_depends_on_past=True
+        )
+        job.run()
+
+        session = settings.Session()
+        drs = session.query(DagRun).filter(
+            DagRun.dag_id=='example_bash_operator'
+        ).order_by(DagRun.execution_date).all()
+
+        self.assertTrue(drs[0].execution_date == DEFAULT_DATE)
+        self.assertTrue(drs[0].state == State.SUCCESS)
+        self.assertTrue(drs[1].execution_date == DEFAULT_DATE+datetime.timedelta(days=1))
+        self.assertTrue(drs[1].state == State.SUCCESS)
+
+        dag.clear()
+        session.close()
+
+    @unittest.skipIf('sqlite' in configuration.get('core', 'sql_alchemy_conn'),
+                     "concurrent access not supported in sqlite")
     def test_backfill_examples(self):
         """
         Test backfilling example dags
@@ -66,6 +124,9 @@ class BackfillJobTest(unittest.TestCase):
         skip_dags = [
             'example_http_operator',
             'example_twitter_dag',
+            'example_trigger_target_dag',
+            'example_trigger_controller_dag',  # tested above
+            'test_utils',  # sleeps forever
         ]
 
         logger = logging.getLogger('BackfillJobTest.test_backfill_examples')
@@ -121,7 +182,7 @@ class BackfillJobTest(unittest.TestCase):
 
     def test_backfill_depends_on_past(self):
         """
-        Test that backfill resects ignore_depends_on_past
+        Test that backfill respects ignore_depends_on_past
         """
         dag = self.dagbag.get_dag('test_depends_on_past')
         dag.clear()
@@ -226,12 +287,6 @@ class SchedulerJobTest(unittest.TestCase):
         dr = dr[0]
         dr.dag = dag
 
-        # dagrun is running
-        self.assertEqual(dr.state, State.RUNNING)
-
-        dr.update_state()
-
-        # dagrun failed
         self.assertEqual(dr.state, dagrun_state)
 
     def test_dagrun_fail(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c39e4f6a/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 0a7be23..6a25ac3 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -95,5 +95,4 @@ class SubDagOperatorTests(unittest.TestCase):
         self.assertRaisesRegexp(AirflowException, 'deadlocked', subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
         # now make sure dag picks up the subdag error
-        subdag.clear()
         self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)