You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/07/28 22:50:31 UTC

incubator-airflow git commit: [AIRFLOW-1349] Refactor BackfillJob _execute

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 322ec9609 -> 547f8184b


[AIRFLOW-1349] Refactor BackfillJob _execute

BackfillJob._execute is doing multiple things - it is pretty hard to
follow and maintain.

Changes included are just a re-org of the code, no logic has been
changed.

Refactor includes:
- Break BackfillJob._execute into functions
- Add a Status object to track BackfillJob
internal status while
  executing the job.

Closes #2463 from edgarRd/erod-backfill-refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/547f8184
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/547f8184
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/547f8184

Branch: refs/heads/master
Commit: 547f8184be1e0b3f902faf354fa39579d2f08af2
Parents: 322ec96
Author: Edgar Rodriguez <ed...@airbnb.com>
Authored: Fri Jul 28 15:49:50 2017 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Jul 28 15:50:24 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py   | 515 ++++++++++++++++++++++++++++++++-----------------
 airflow/models.py |  31 +++
 tests/jobs.py     | 189 +++++++++++++-----
 3 files changed, 501 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/547f8184/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index e2f8c94..668973e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1817,6 +1817,62 @@ class BackfillJob(BaseJob):
         'polymorphic_identity': 'BackfillJob'
     }
 
+    class _DagRunTaskStatus(object):
+        """
+        Internal status of the backfill job. This class is intended to be instantiated
+        only within a BackfillJob instance and will track the execution of tasks,
+        e.g. started, skipped, succeeded, failed, etc. Information about the dag runs
+        related to the backfill job are also being tracked in this structure,
+        .e.g finished runs, etc. Any other status related information related to the
+        execution of dag runs / tasks can be included in this structure since it makes
+        it easier to pass it around.
+        """
+        # TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
+        def __init__(self,
+                     to_run=None,
+                     started=None,
+                     skipped=None,
+                     succeeded=None,
+                     failed=None,
+                     not_ready=None,
+                     deadlocked=None,
+                     active_runs=None,
+                     finished_runs=0,
+                     total_runs=0,
+                     ):
+            """
+            :param to_run: Tasks to run in the backfill
+            :type to_run: dict
+            :param started: Maps started task instance key to task instance object
+            :type started: dict
+            :param skipped: Tasks that have been skipped
+            :type skipped: set
+            :param succeeded: Tasks that have succeeded so far
+            :type succeeded: set
+            :param failed: Tasks that have failed
+            :type failed: set
+            :param not_ready: Tasks not ready for execution
+            :type not_ready: set
+            :param deadlocked: Deadlocked tasks
+            :type deadlocked: set
+            :param active_runs: Active tasks at a certain point in time
+            :type active_runs: list
+            :param finished_runs: Number of finished runs so far
+            :type finished_runs: int
+            :param total_runs: Number of total dag runs able to run
+            :type total_runs: int
+            """
+            self.to_run = to_run or dict()
+            self.started = started or dict()
+            self.skipped = skipped or set()
+            self.succeeded = succeeded or set()
+            self.failed = failed or set()
+            self.not_ready = not_ready or set()
+            self.deadlocked = deadlocked or set()
+            self.active_runs = active_runs or list()
+            self.finished_runs = finished_runs
+            self.total_runs = total_runs
+
     def __init__(
             self,
             dag,
@@ -1841,41 +1897,38 @@ class BackfillJob(BaseJob):
         self.pool = pool
         super(BackfillJob, self).__init__(*args, **kwargs)
 
-    def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run):
+    def _update_counters(self, ti_status):
         """
         Updates the counters per state of the tasks that were running. Can re-add
         to tasks to run in case required.
-        :param started:
-        :param succeeded:
-        :param skipped:
-        :param failed:
-        :param tasks_to_run:
+        :param ti_status: the internal status of the backfill job tasks
+        :type ti_status: BackfillJob._DagRunTaskStatus
         """
-        for key, ti in list(started.items()):
+        for key, ti in list(ti_status.started.items()):
             ti.refresh_from_db()
             if ti.state == State.SUCCESS:
-                succeeded.add(key)
+                ti_status.succeeded.add(key)
                 self.logger.debug("Task instance {} succeeded. "
                                   "Don't rerun.".format(ti))
-                started.pop(key)
+                ti_status.started.pop(key)
                 continue
             elif ti.state == State.SKIPPED:
-                skipped.add(key)
+                ti_status.skipped.add(key)
                 self.logger.debug("Task instance {} skipped. "
                                   "Don't rerun.".format(ti))
-                started.pop(key)
+                ti_status.started.pop(key)
                 continue
             elif ti.state == State.FAILED:
                 self.logger.error("Task instance {} failed".format(ti))
-                failed.add(key)
-                started.pop(key)
+                ti_status.failed.add(key)
+                ti_status.started.pop(key)
                 continue
             # special case: if the task needs to run again put it back
             elif ti.state == State.UP_FOR_RETRY:
                 self.logger.warning("Task instance {} is up for retry"
                                     .format(ti))
-                started.pop(key)
-                tasks_to_run[key] = ti
+                ti_status.started.pop(key)
+                ti_status.to_run[key] = ti
             # special case: The state of the task can be set to NONE by the task itself
             # when it reaches concurrency limits. It could also happen when the state
             # is changed externally, e.g. by clearing tasks from the ui. We need to cover
@@ -1888,8 +1941,8 @@ class BackfillJob(BaseJob):
                 session = settings.Session()
                 ti.set_state(State.SCHEDULED, session=session)
                 session.close()
-                started.pop(key)
-                tasks_to_run[key] = ti
+                ti_status.started.pop(key)
+                ti_status.to_run[key] = ti
 
     def _manage_executor_state(self, started):
         """
@@ -1918,119 +1971,141 @@ class BackfillJob(BaseJob):
                     self.logger.error(msg)
                     ti.handle_failure(msg)
 
-    def _execute(self):
+    @provide_session
+    def _get_dag_run(self, run_date, session=None):
         """
-        Runs a dag for a specified date range.
+        Returns a dag run for the given run date, which will be matched to an existing
+        dag run if available or create a new dag run otherwise.
+        :param run_date: the execution date for the dag run
+        :type run_date: datetime
+        :param session: the database session object
+        :type session: Session
+        :return: the dag run for the run date
         """
-        session = settings.Session()
-        DagRun = models.DagRun
-
-        # consider max_active_runs but ignore when running subdags
-        # "parent.child" as a dag_id is by convention a subdag
-        if self.dag.schedule_interval and not self.dag.is_subdag:
-            active_runs = DagRun.find(
-                dag_id=self.dag.dag_id,
+        run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
+
+        # check if we are scheduling on top of a already existing dag_run
+        # we could find a "scheduled" run instead of a "backfill"
+        run = DagRun.find(dag_id=self.dag.dag_id,
+                          execution_date=run_date,
+                          session=session)
+
+        if run is None or len(run) == 0:
+            run = self.dag.create_dagrun(
+                run_id=run_id,
+                execution_date=run_date,
+                start_date=datetime.now(),
                 state=State.RUNNING,
                 external_trigger=False,
                 session=session
             )
+        else:
+            run = run[0]
 
-            # return if already reached maximum active runs
-            if len(active_runs) >= self.dag.max_active_runs:
-                self.logger.info("Dag {} has reached maximum amount of {} dag runs"
-                                 .format(self.dag.dag_id, self.dag.max_active_runs))
-                return
-
-        start_date = self.bf_start_date
-        end_date = self.bf_end_date
-
-        # picklin'
-        pickle_id = None
-        if not self.donot_pickle and self.executor.__class__ not in (
-                executors.LocalExecutor, executors.SequentialExecutor):
-            pickle = models.DagPickle(self.dag)
-            session.add(pickle)
-            session.commit()
-            pickle_id = pickle.id
+        # set required transient field
+        run.dag = self.dag
 
-        executor = self.executor
-        executor.start()
+        # explicitly mark as backfill and running
+        run.state = State.RUNNING
+        run.run_id = run_id
+        run.verify_integrity(session=session)
+        return run
 
-        # Build a list of all instances to run
+    @provide_session
+    def _task_instances_for_dag_run(self, dag_run, session=None):
+        """
+        Returns a map of task instance key to task instance object for the tasks to
+        run in the given dag run.
+        :param dag_run: the dag run to get the tasks from
+        :type dag_run: models.DagRun
+        :param session: the database session object
+        :type session: Session
+        """
         tasks_to_run = {}
-        failed = set()
-        succeeded = set()
-        started = {}
-        skipped = set()
-        not_ready = set()
-        deadlocked = set()
-
-        # create dag runs
-        dr_start_date = start_date or min([t.start_date for t in self.dag.tasks])
-        end_date = end_date or datetime.now()
-        # next run date for a subdag isn't relevant (schedule_interval for subdags
-        # is ignored) so we use the dag run's start date in the case of a subdag
-        next_run_date = (self.dag.normalize_schedule(dr_start_date)
-                         if not self.dag.is_subdag else dr_start_date)
-
-        active_dag_runs = []
-        while next_run_date and next_run_date <= end_date:
-            run_id = BackfillJob.ID_FORMAT_PREFIX.format(next_run_date.isoformat())
-
-            # check if we are scheduling on top of a already existing dag_run
-            # we could find a "scheduled" run instead of a "backfill"
-            run = DagRun.find(dag_id=self.dag.dag_id,
-                              execution_date=next_run_date,
-                              session=session)
-            if not run:
-                run = self.dag.create_dagrun(
-                    run_id=run_id,
-                    execution_date=next_run_date,
-                    start_date=datetime.now(),
-                    state=State.RUNNING,
-                    external_trigger=False,
-                    session=session,
-                )
-            else:
-                run = run[0]
 
-            # set required transient field
-            run.dag = self.dag
-
-            # explictely mark running as we can fill gaps
-            run.state = State.RUNNING
-            run.run_id = run_id
-            run.verify_integrity(session=session)
+        if dag_run is None:
+            return tasks_to_run
 
-            self.reset_state_for_orphaned_tasks(filter_by_dag_run=run, session=session)
+        # check if we have orphaned tasks
+        self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
 
-            # for some reason if we dont refresh the reference to run is lost
-            run.refresh_from_db()
-            make_transient(run)
-            active_dag_runs.append(run)
+        # for some reason if we don't refresh the reference to run is lost
+        dag_run.refresh_from_db()
+        make_transient(dag_run)
 
-            for ti in run.get_task_instances():
-                # all tasks part of the backfill are scheduled to run
-                if ti.state == State.NONE:
-                    ti.set_state(State.SCHEDULED, session=session)
-                tasks_to_run[ti.key] = ti
+        # TODO(edgarRd): AIRFLOW-1464 change to batch query to improve perf
+        for ti in dag_run.get_task_instances():
+            # all tasks part of the backfill are scheduled to run
+            if ti.state == State.NONE:
+                ti.set_state(State.SCHEDULED, session=session)
+            tasks_to_run[ti.key] = ti
+
+        return tasks_to_run
+
+    def _log_progress(self, ti_status):
+        msg = ' | '.join([
+            "[backfill progress]",
+            "finished run {0} of {1}",
+            "tasks waiting: {2}",
+            "succeeded: {3}",
+            "kicked_off: {4}",
+            "failed: {5}",
+            "skipped: {6}",
+            "deadlocked: {7}",
+            "not ready: {8}"
+        ]).format(
+            ti_status.finished_runs,
+            ti_status.total_runs,
+            len(ti_status.to_run),
+            len(ti_status.succeeded),
+            len(ti_status.started),
+            len(ti_status.failed),
+            len(ti_status.skipped),
+            len(ti_status.deadlocked),
+            len(ti_status.not_ready))
+        self.logger.info(msg)
+
+        self.logger.debug("Finished dag run loop iteration. "
+                          "Remaining tasks {}"
+                          .format(ti_status.to_run.values()))
 
-            next_run_date = self.dag.following_schedule(next_run_date)
+    @provide_session
+    def _process_backfill_task_instances(self,
+                                         ti_status,
+                                         executor,
+                                         pickle_id,
+                                         start_date=None, session=None):
+        """
+        Process a set of task instances from a set of dag runs. Special handling is done
+        to account for different task instance states that could be present when running
+        them in a backfill process.
+        :param ti_status: the internal status of the job
+        :type ti_status: BackfillJob._DagRunTaskStatus
+        :param executor: the executor to run the task instances
+        :type executor: BaseExecutor
+        :param pickle_id: the pickle_id if dag is pickled, None otherwise
+        :type pickle_id: int
+        :param start_date: the start date of the backfill job
+        :type start_date: datetime
+        :param session: the current session object
+        :type session: Session
+        :return: the list of execution_dates for the finished dag runs
+        :rtype: list
+        """
 
-        finished_runs = 0
-        total_runs = len(active_dag_runs)
+        executed_run_dates = []
 
-        # Triggering what is ready to get triggered
-        while (len(tasks_to_run) > 0 or len(started) > 0) and not deadlocked:
+        while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
+                len(ti_status.deadlocked) == 0):
             self.logger.debug("*** Clearing out not_ready list ***")
-            not_ready.clear()
+            ti_status.not_ready.clear()
 
             # we need to execute the tasks bottom to top
             # or leaf to root, as otherwise tasks might be
             # determined deadlocked while they are actually
             # waiting for their upstream to finish
             for task in self.dag.topological_sort():
-                for key, ti in list(tasks_to_run.items()):
+                for key, ti in list(ti_status.to_run.items()):
                     if task.task_id != ti.task_id:
                         continue
 
@@ -2055,35 +2130,36 @@ class BackfillJob(BaseJob):
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
-                        succeeded.add(key)
+                        ti_status.succeeded.add(key)
                         self.logger.debug("Task instance {} succeeded. "
                                           "Don't rerun.".format(ti))
-                        tasks_to_run.pop(key)
-                        if key in started:
-                            started.pop(key)
+                        ti_status.to_run.pop(key)
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
-                        skipped.add(key)
+                        ti_status.skipped.add(key)
                         self.logger.debug("Task instance {} skipped. "
                                           "Don't rerun.".format(ti))
-                        tasks_to_run.pop(key)
-                        if key in started:
-                            started.pop(key)
+                        ti_status.to_run.pop(key)
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
                         self.logger.error("Task instance {} failed".format(ti))
-                        failed.add(key)
-                        tasks_to_run.pop(key)
-                        if key in started:
-                            started.pop(key)
+                        ti_status.failed.add(key)
+                        ti_status.to_run.pop(key)
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
                         continue
                     elif ti.state == State.UPSTREAM_FAILED:
                         self.logger.error("Task instance {} upstream failed".format(ti))
-                        failed.add(key)
-                        tasks_to_run.pop(key)
-                        if key in started:
-                            started.pop(key)
+                        ti_status.failed.add(key)
+                        ti_status.to_run.pop(key)
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
                         continue
+
                     backfill_context = DepContext(
                         deps=RUN_DEPS,
                         ignore_depends_on_past=ignore_depends_on_past,
@@ -2113,31 +2189,31 @@ class BackfillJob(BaseJob):
                                     ignore_task_deps=self.ignore_task_deps,
                                     ignore_depends_on_past=ignore_depends_on_past,
                                     pool=self.pool)
-                                started[key] = ti
-                                tasks_to_run.pop(key)
+                                ti_status.started[key] = ti
+                                ti_status.to_run.pop(key)
                         session.commit()
                         continue
 
                     if ti.state == State.UPSTREAM_FAILED:
                         self.logger.error("Task instance {} upstream failed".format(ti))
-                        failed.add(key)
-                        tasks_to_run.pop(key)
-                        if key in started:
-                            started.pop(key)
+                        ti_status.failed.add(key)
+                        ti_status.to_run.pop(key)
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
                         continue
 
                     # special case
                     if ti.state == State.UP_FOR_RETRY:
                         self.logger.debug("Task instance {} retry period not expired yet"
                                           .format(ti))
-                        if key in started:
-                            started.pop(key)
-                        tasks_to_run[key] = ti
+                        if key in ti_status.started:
+                            ti_status.started.pop(key)
+                        ti_status.to_run[key] = ti
                         continue
 
                     # all remaining tasks
                     self.logger.debug('Adding {} to not_ready'.format(ti))
-                    not_ready.add(key)
+                    ti_status.not_ready.add(key)
 
             # execute the tasks in the queue
             self.heartbeat()
@@ -2146,68 +2222,45 @@ class BackfillJob(BaseJob):
             # If the set of tasks that aren't ready ever equals the set of
             # tasks to run and there are no running tasks then the backfill
             # is deadlocked
-            if not_ready and not_ready == set(tasks_to_run) and len(started) == 0:
-                self.logger.warning("Deadlock discovered for tasks_to_run={}"
-                                    .format(tasks_to_run.values()))
-                deadlocked.update(tasks_to_run.values())
-                tasks_to_run.clear()
+            if (ti_status.not_ready and
+                    ti_status.not_ready == set(ti_status.to_run) and
+                    len(ti_status.started) == 0):
+                self.logger.warning("Deadlock discovered for ti_status.to_run={}"
+                                    .format(ti_status.to_run.values()))
+                ti_status.deadlocked.update(ti_status.to_run.values())
+                ti_status.to_run.clear()
 
             # check executor state
-            self._manage_executor_state(started)
+            self._manage_executor_state(ti_status.started)
 
             # update the task counters
-            self._update_counters(started=started, succeeded=succeeded,
-                                  skipped=skipped, failed=failed,
-                                  tasks_to_run=tasks_to_run)
+            self._update_counters(ti_status=ti_status)
 
             # update dag run state
-            _dag_runs = active_dag_runs[:]
+            _dag_runs = ti_status.active_runs[:]
             for run in _dag_runs:
                 run.update_state(session=session)
                 if run.state in State.finished():
-                    finished_runs += 1
-                    active_dag_runs.remove(run)
+                    ti_status.finished_runs += 1
+                    ti_status.active_runs.remove(run)
+                    executed_run_dates.append(run.execution_date)
 
                 if run.dag.is_paused:
                     models.DagStat.update([run.dag_id], session=session)
 
-            msg = ' | '.join([
-                "[backfill progress]",
-                "finished run {0} of {1}",
-                "tasks waiting: {2}",
-                "succeeded: {3}",
-                "kicked_off: {4}",
-                "failed: {5}",
-                "skipped: {6}",
-                "deadlocked: {7}",
-                "not ready: {8}"
-            ]).format(
-                finished_runs,
-                total_runs,
-                len(tasks_to_run),
-                len(succeeded),
-                len(started),
-                len(failed),
-                len(skipped),
-                len(deadlocked),
-                len(not_ready))
-            self.logger.info(msg)
-
-            self.logger.debug("Finished dag run loop iteration. "
-                              "Remaining tasks {}"
-                              .format(tasks_to_run.values()))
+            self._log_progress(ti_status)
 
-        executor.end()
-
-        session.commit()
-        session.close()
+        # return updated status
+        return executed_run_dates
 
+    @provide_session
+    def _collect_errors(self, ti_status, session=None):
         err = ''
-        if failed:
+        if ti_status.failed:
             err += (
                 "---------------------------------------------------\n"
-                "Some task instances failed:\n{}\n".format(failed))
-        if deadlocked:
+                "Some task instances failed:\n{}\n".format(ti_status.failed))
+        if ti_status.deadlocked:
             err += (
                 '---------------------------------------------------\n'
                 'BackfillJob is deadlocked.')
@@ -2220,7 +2273,7 @@ class BackfillJob(BaseJob):
                     dep_context=DepContext(ignore_depends_on_past=True),
                     session=session,
                     verbose=True)
-                for t in deadlocked)
+                for t in ti_status.deadlocked)
             if deadlocked_depends_on_past:
                 err += (
                     'Some of the deadlocked tasks were unable to run because '
@@ -2228,11 +2281,109 @@ class BackfillJob(BaseJob):
                     'backfill with the option '
                     '"ignore_first_depends_on_past=True" or passing "-I" at '
                     'the command line.')
-            err += ' These tasks have succeeded:\n{}\n'.format(succeeded)
-            err += ' These tasks have started:\n{}\n'.format(started)
-            err += ' These tasks have failed:\n{}\n'.format(failed)
-            err += ' These tasks are skipped:\n{}\n'.format(skipped)
-            err += ' These tasks are deadlocked:\n{}\n'.format(deadlocked)
+            err += ' These tasks have succeeded:\n{}\n'.format(ti_status.succeeded)
+            err += ' These tasks have started:\n{}\n'.format(ti_status.started)
+            err += ' These tasks have failed:\n{}\n'.format(ti_status.failed)
+            err += ' These tasks are skipped:\n{}\n'.format(ti_status.skipped)
+            err += ' These tasks are deadlocked:\n{}\n'.format(ti_status.deadlocked)
+
+        return err
+
+    @provide_session
+    def _execute_for_run_dates(self, run_dates, ti_status, executor, pickle_id,
+                               start_date, session=None):
+        """
+        Computes the dag runs and their respective task instances for
+        the given run dates and executes the task instances.
+        Returns a list of execution dates of the dag runs that were executed.
+        :param run_dates: Execution dates for dag runs
+        :type run_dates: list
+        :param ti_status: internal BackfillJob status structure to tis track progress
+        :type ti_status: BackfillJob._DagRunTaskStatus
+        :param executor: the executor to use, it must be previously started
+        :type executor: BaseExecutor
+        :param pickle_id: numeric id of the pickled dag, None if not pickled
+        :type pickle_id: int
+        :param start_date: backfill start date
+        :type start_date: datetime
+        :param session: the current session object
+        :type session: Session
+        :return: list of execution dates of the dag runs that were executed.
+        :rtype: list
+        """
+        for next_run_date in run_dates:
+            dag_run = self._get_dag_run(next_run_date, session=session)
+            tis_map = self._task_instances_for_dag_run(dag_run,
+                                                       session=session)
+            ti_status.active_runs.append(dag_run)
+            ti_status.to_run.update(tis_map or {})
+
+        ti_status.total_runs = len(ti_status.active_runs)
+
+        return self._process_backfill_task_instances(ti_status=ti_status,
+                                                     executor=executor,
+                                                     pickle_id=pickle_id,
+                                                     start_date=start_date,
+                                                     session=session)
+
+    def _execute(self):
+        """
+        Initializes all components required to run a dag for a specified date range and
+        calls helper method to execute the tasks.
+        """
+        session = settings.Session()
+        ti_status = BackfillJob._DagRunTaskStatus()
+
+        # consider max_active_runs but ignore when running subdags
+        # "parent.child" as a dag_id is by convention a subdag
+        if self.dag.schedule_interval and not self.dag.is_subdag:
+            all_active_runs = DagRun.find(
+                dag_id=self.dag.dag_id,
+                state=State.RUNNING,
+                external_trigger=False,
+                session=session
+            )
+
+            # return if already reached maximum active runs
+            if len(all_active_runs) >= self.dag.max_active_runs:
+                self.logger.info("Dag {} has reached maximum amount of {} dag runs"
+                                 .format(self.dag.dag_id, self.dag.max_active_runs))
+                return
+
+        start_date = self.bf_start_date
+
+        # Get intervals between the start/end dates, which will turn into dag runs
+        run_dates = self.dag.get_run_dates(start_date=start_date,
+                                           end_date=self.bf_end_date)
+        if len(run_dates) == 0:
+            self.logger.info("No run dates were found for the given dates and dag "
+                             "interval.")
+            return
+
+        # picklin'
+        pickle_id = None
+        if not self.donot_pickle and self.executor.__class__ not in (
+                executors.LocalExecutor, executors.SequentialExecutor):
+            pickle = models.DagPickle(self.dag)
+            session.add(pickle)
+            session.commit()
+            pickle_id = pickle.id
+
+        executor = self.executor
+        executor.start()
+
+        self._execute_for_run_dates(run_dates=run_dates,
+                                    ti_status=ti_status,
+                                    executor=executor,
+                                    pickle_id=pickle_id,
+                                    start_date=start_date,
+                                    session=session)
+
+        executor.end()
+        session.commit()
+        session.close()
+
+        err = self._collect_errors(ti_status=ti_status, session=session)
         if err:
             raise AirflowException(err)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/547f8184/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d1f8e59..cc54f36 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2883,6 +2883,37 @@ class DAG(BaseDag, LoggingMixin):
         elif isinstance(self._schedule_interval, timedelta):
             return dttm - self._schedule_interval
 
+    def get_run_dates(self, start_date, end_date=None):
+        """
+        Returns a list of dates between the interval received as parameter using this
+        dag's schedule interval. Returned dates can be used for execution dates.
+        :param start_date: the start date of the interval
+        :type start_date: datetime
+        :param end_date: the end date of the interval, defaults to datetime.now()
+        :type end_date: datetime
+        :return: a list of dates within the interval following the dag's schedule
+        :rtype: list
+        """
+        run_dates = []
+
+        using_start_date = start_date
+        using_end_date = end_date
+
+        # dates for dag runs
+        using_start_date = using_start_date or min([t.start_date for t in self.tasks])
+        using_end_date = using_end_date or datetime.now()
+
+        # next run date for a subdag isn't relevant (schedule_interval for subdags
+        # is ignored) so we use the dag run's start date in the case of a subdag
+        next_run_date = (self.normalize_schedule(using_start_date)
+                         if not self.is_subdag else using_start_date)
+
+        while next_run_date and next_run_date <= using_end_date:
+            run_dates.append(next_run_date)
+            next_run_date = self.following_schedule(next_run_date)
+
+        return run_dates
+
     def normalize_schedule(self, dttm):
         """
         Returns dttm + interval unless dttm is first interval then it returns dttm

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/547f8184/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index c9ab742..fa27b46 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -71,6 +71,7 @@ TEMP_DAG_FILENAME = "temp_dag.py"
 TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
 
+
 class BackfillJobTest(unittest.TestCase):
 
     def setUp(self):
@@ -302,6 +303,72 @@ class BackfillJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.SUCCESS)
         dag.clear()
 
+    def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1):
+        dag = DAG(
+            dag_id=dag_id,
+            start_date=DEFAULT_DATE,
+            schedule_interval="@hourly",
+            max_active_runs=max_active_runs
+        )
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+
+            op1 >> op2 >> op3
+            op4 >> op3
+
+        dag.clear()
+        return dag
+
+    def test_backfill_max_limit_check_within_limit(self):
+        dag = self._get_dag_test_max_active_limits(
+            'test_backfill_max_limit_check_within_limit',
+            max_active_runs=16)
+
+        start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+        end_date = DEFAULT_DATE
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=dag,
+                          start_date=start_date,
+                          end_date=end_date,
+                          executor=executor,
+                          donot_pickle=True)
+        job.run()
+
+        # dag run could not run since the max_active_runs has been reached
+        dagruns = DagRun.find(dag_id=dag.dag_id)
+        self.assertEqual(4, len(dagruns))
+        self.assertTrue(all([run.state == State.SUCCESS for run in dagruns]))
+
+    def test_backfill_max_limit_check(self):
+        dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check')
+
+        start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+        end_date = DEFAULT_DATE
+
+        # Existing dagrun that is not within the backfill range
+        dr = dag.create_dagrun(run_id="test_dagrun",
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+                               start_date=DEFAULT_DATE)
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=dag,
+                          start_date=start_date,
+                          end_date=end_date,
+                          executor=executor,
+                          donot_pickle=True)
+        job.run()
+
+        # dag run could not run since the max_active_runs has been reached
+        dagruns = DagRun.find(dag_id=dag.dag_id)
+        self.assertEqual(1, len(dagruns))
+        self.assertEqual(dagruns[0].run_id, dr.run_id)
+
     def test_sub_set_subdag(self):
         dag = DAG(
             'test_sub_set_subdag',
@@ -467,70 +534,84 @@ class BackfillJobTest(unittest.TestCase):
         ti = TI(task1, dr.execution_date)
         ti.refresh_from_db()
 
-        started = {}
-        tasks_to_run = {}
-        failed = set()
-        succeeded = set()
-        started = {}
-        skipped = set()
+        ti_status = BackfillJob._DagRunTaskStatus()
 
         # test for success
         ti.set_state(State.SUCCESS, session)
-        started[ti.key] = ti
-        job._update_counters(started=started, succeeded=succeeded,
-                                     skipped=skipped, failed=failed,
-                                     tasks_to_run=tasks_to_run)
-        self.assertTrue(len(started) == 0)
-        self.assertTrue(len(succeeded) == 1)
-        self.assertTrue(len(skipped) == 0)
-        self.assertTrue(len(failed) == 0)
-        self.assertTrue(len(tasks_to_run) == 0)
-
-        succeeded.clear()
+        ti_status.started[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.started) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 1)
+        self.assertTrue(len(ti_status.skipped) == 0)
+        self.assertTrue(len(ti_status.failed) == 0)
+        self.assertTrue(len(ti_status.to_run) == 0)
+
+        ti_status.succeeded.clear()
 
         # test for skipped
         ti.set_state(State.SKIPPED, session)
-        started[ti.key] = ti
-        job._update_counters(started=started, succeeded=succeeded,
-                                     skipped=skipped, failed=failed,
-                                     tasks_to_run=tasks_to_run)
-        self.assertTrue(len(started) == 0)
-        self.assertTrue(len(succeeded) == 0)
-        self.assertTrue(len(skipped) == 1)
-        self.assertTrue(len(failed) == 0)
-        self.assertTrue(len(tasks_to_run) == 0)
-
-        skipped.clear()
+        ti_status.started[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.started) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 0)
+        self.assertTrue(len(ti_status.skipped) == 1)
+        self.assertTrue(len(ti_status.failed) == 0)
+        self.assertTrue(len(ti_status.to_run) == 0)
+
+        ti_status.skipped.clear()
 
         # test for failed
         ti.set_state(State.FAILED, session)
-        started[ti.key] = ti
-        job._update_counters(started=started, succeeded=succeeded,
-                                     skipped=skipped, failed=failed,
-                                     tasks_to_run=tasks_to_run)
-        self.assertTrue(len(started) == 0)
-        self.assertTrue(len(succeeded) == 0)
-        self.assertTrue(len(skipped) == 0)
-        self.assertTrue(len(failed) == 1)
-        self.assertTrue(len(tasks_to_run) == 0)
-
-        failed.clear()
+        ti_status.started[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.started) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 0)
+        self.assertTrue(len(ti_status.skipped) == 0)
+        self.assertTrue(len(ti_status.failed) == 1)
+        self.assertTrue(len(ti_status.to_run) == 0)
+
+        ti_status.failed.clear()
 
         # test for reschedule
         # test for failed
         ti.set_state(State.NONE, session)
-        started[ti.key] = ti
-        job._update_counters(started=started, succeeded=succeeded,
-                                     skipped=skipped, failed=failed,
-                                     tasks_to_run=tasks_to_run)
-        self.assertTrue(len(started) == 0)
-        self.assertTrue(len(succeeded) == 0)
-        self.assertTrue(len(skipped) == 0)
-        self.assertTrue(len(failed) == 0)
-        self.assertTrue(len(tasks_to_run) == 1)
+        ti_status.started[ti.key] = ti
+        job._update_counters(ti_status=ti_status)
+        self.assertTrue(len(ti_status.started) == 0)
+        self.assertTrue(len(ti_status.succeeded) == 0)
+        self.assertTrue(len(ti_status.skipped) == 0)
+        self.assertTrue(len(ti_status.failed) == 0)
+        self.assertTrue(len(ti_status.to_run) == 1)
 
         session.close()
 
+    def test_dag_get_run_dates(self):
+
+        def get_test_dag_for_backfill(schedule_interval=None):
+            dag = DAG(
+                dag_id='test_get_dates',
+                start_date=DEFAULT_DATE,
+                schedule_interval=schedule_interval)
+            DummyOperator(
+                task_id='dummy',
+                dag=dag,
+                owner='airflow')
+            return dag
+
+        test_dag = get_test_dag_for_backfill()
+        self.assertEqual([DEFAULT_DATE], test_dag.get_run_dates(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE))
+
+        test_dag = get_test_dag_for_backfill(schedule_interval="@hourly")
+        self.assertEqual([DEFAULT_DATE - datetime.timedelta(hours=3),
+                          DEFAULT_DATE - datetime.timedelta(hours=2),
+                          DEFAULT_DATE - datetime.timedelta(hours=1),
+                          DEFAULT_DATE],
+                         test_dag.get_run_dates(
+                             start_date=DEFAULT_DATE - datetime.timedelta(hours=3),
+                             end_date=DEFAULT_DATE,))
+
 
 class LocalTaskJobTest(unittest.TestCase):
     def setUp(self):
@@ -558,7 +639,9 @@ class LocalTaskJobTest(unittest.TestCase):
         ti.hostname = "blablabla"
         session.commit()
 
-        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1 = LocalTaskJob(task_instance=ti,
+                            ignore_ti_state=True,
+                            executor=SequentialExecutor())
         self.assertRaises(AirflowException, job1.heartbeat_callback)
 
         is_descendant.return_value = True
@@ -597,7 +680,9 @@ class LocalTaskJobTest(unittest.TestCase):
         session.commit()
 
         ti_run = TI(task=task, execution_date=DEFAULT_DATE)
-        job1 = LocalTaskJob(task_instance=ti_run, ignore_ti_state=True, executor=SequentialExecutor())
+        job1 = LocalTaskJob(task_instance=ti_run,
+                            ignore_ti_state=True,
+                            executor=SequentialExecutor())
         self.assertRaises(AirflowException, job1.run)
 
         ti = dr.get_task_instance(task_id=task.task_id, session=session)
@@ -755,7 +840,7 @@ class SchedulerJobTest(unittest.TestCase):
         res_keys = map(lambda x: x.key, res)
         self.assertIn(ti_no_dagrun.key, res_keys)
         self.assertIn(ti_with_dagrun.key, res_keys)
-        
+
     def test_find_executable_task_instances_pool(self):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_pool'
         task_id_1 = 'dummy'
@@ -2323,7 +2408,7 @@ class SchedulerJobTest(unittest.TestCase):
         for file_path in list_py_file_paths(TEST_DAGS_FOLDER):
             detected_files.append(file_path)
         self.assertEqual(sorted(detected_files), sorted(expected_files))
-        
+
     def test_reset_orphaned_tasks_nothing(self):
         """Try with nothing. """
         scheduler = SchedulerJob(**self.default_scheduler_args)