You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/11 17:41:30 UTC

incubator-airflow git commit: [AIRFLOW-910] Use parallel task execution for backfills

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 201bd92dc -> d79ed7478


[AIRFLOW-910] Use parallel task execution for backfills

The refactor to use dag runs in backfills caused a
regression
in task execution performance as dag runs were
executed
sequentially. Next to that, the backfills were non
deterministic
due to the random execution of tasks, causing root
tasks
being added to the non ready list too soon.

This updates the backfill logic as follows:
* Parallelize execution of tasks
* Use a leave first execution model
* Replace state updates from the executor by task
based only

Closes #2107 from bolkedebruin/AIRFLOW-910


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d79ed747
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d79ed747
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d79ed747

Branch: refs/heads/master
Commit: d79ed74783048b2816a4005ba887c2985e753ecf
Parents: 201bd92
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 09:40:38 2017 -0800
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Mar 11 09:40:57 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py                    | 385 +++++++++++++++++---------------
 airflow/models.py                  |  50 +++++
 tests/executors/test_executor.py   |  25 ++-
 tests/jobs.py                      |  48 ++++
 tests/models.py                    |  66 ++++++
 tests/operators/subdag_operator.py |   4 +-
 6 files changed, 393 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f385a6b..d99b697 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -211,6 +211,28 @@ class BaseJob(Base, LoggingMixin):
     def _execute(self):
         raise NotImplementedError("This method needs to be overridden")
 
+    @provide_session
+    def reset_state_for_orphaned_tasks(self, dag_run, session=None):
+        """
+        This function checks for a DagRun if there are any tasks
+        that have a scheduled state but are not known by the
+        executor. If it finds those it will reset the state to None
+        so they will get picked up again.
+        """
+        queued_tis = self.executor.queued_tasks
+
+        # also consider running as the state might not have changed in the db yet
+        running = self.executor.running
+        tis = list()
+        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
+        tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
+
+        for ti in tis:
+            if ti.key not in queued_tis and ti.key not in running:
+                self.logger.debug("Rescheduling orphaned task {}".format(ti))
+                ti.state = State.NONE
+        session.commit()
+
 
 class DagFileProcessor(AbstractDagFileProcessor):
     """Helps call SchedulerJob.process_file() in a separate process."""
@@ -1236,28 +1258,6 @@ class SchedulerJob(BaseJob):
 
         self.logger.info(log_str)
 
-    @provide_session
-    def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
-        """
-        This function checks for a DagRun if there are any tasks
-        that have a scheduled state but are not known by the
-        executor. If it finds those it will reset the state to None
-        so they will get picked up again.
-        """
-        queued_tis = self.executor.queued_tasks
-
-        # also consider running as the state might not have changed in the db yet
-        running = self.executor.running
-        tis = list()
-        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
-        tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session))
-
-        for ti in tis:
-            if ti.key not in queued_tis and ti.key not in running:
-                self.logger.debug("Rescheduling orphaned task {}".format(ti))
-                ti.state = State.NONE
-        session.commit()
-
     def _execute(self):
         self.logger.info("Starting the scheduler")
         pessimistic_connection_handling()
@@ -1361,7 +1361,7 @@ class SchedulerJob(BaseJob):
         for dr in active_runs:
             self.logger.info("Resetting {} {}".format(dr.dag_id,
                                                       dr.execution_date))
-            self._reset_state_for_orphaned_tasks(dr, session=session)
+            self.reset_state_for_orphaned_tasks(dr, session=session)
 
         session.close()
 
@@ -1663,6 +1663,68 @@ class BackfillJob(BaseJob):
         self.pool = pool
         super(BackfillJob, self).__init__(*args, **kwargs)
 
+    def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run):
+        """
+        Updates the counters per state of the tasks that were running
+        :param started:
+        :param succeeded:
+        :param skipped:
+        :param failed:
+        :param tasks_to_run:
+        """
+        for key, ti in list(started.items()):
+            ti.refresh_from_db()
+            if ti.state == State.SUCCESS:
+                succeeded.add(key)
+                self.logger.debug("Task instance {} succeeded. "
+                                  "Don't rerun.".format(ti))
+                started.pop(key)
+                continue
+            elif ti.state == State.SKIPPED:
+                skipped.add(key)
+                self.logger.debug("Task instance {} skipped. "
+                                  "Don't rerun.".format(ti))
+                started.pop(key)
+                continue
+            elif ti.state == State.FAILED:
+                self.logger.error("Task instance {} failed".format(ti))
+                failed.add(key)
+                started.pop(key)
+                continue
+            # special case: if the task needs to run again put it back
+            elif ti.state == State.UP_FOR_RETRY:
+                self.logger.warning("Task instance {} is up for retry"
+                                    .format(ti))
+                started.pop(key)
+                tasks_to_run[key] = ti
+
+    def _manage_executor_state(self, started):
+        """
+        Checks if the executor agrees with the state of task instances
+        that are running
+        :param started: dict of key, task to verify
+        """
+        executor = self.executor
+
+        for key, state in list(executor.get_event_buffer().items()):
+            if key not in started:
+                self.logger.warning("{} state {} not in started={}"
+                                    .format(key, state, started.values()))
+                continue
+
+            ti = started[key]
+            ti.refresh_from_db()
+
+            self.logger.debug("Executor state: {} task {}".format(state, ti))
+
+            if state == State.FAILED or state == State.SUCCESS:
+                if ti.state == State.RUNNING or ti.state == State.QUEUED:
+                    msg = ("Executor reports task instance {} finished ({}) "
+                           "although the task says its {}. Was the task "
+                           "killed externally?".format(ti, state, ti.state))
+                    self.logger.error(msg)
+                    ti.handle_failure(msg)
+
     def _execute(self):
         """
         Runs a dag for a specified date range.
@@ -1700,13 +1762,12 @@ class BackfillJob(BaseJob):
 
         executor = self.executor
         executor.start()
-        executor_fails = Counter()
 
         # Build a list of all instances to run
         tasks_to_run = {}
         failed = set()
         succeeded = set()
-        started = set()
+        started = {}
         skipped = set()
         not_ready = set()
         deadlocked = set()
@@ -1744,33 +1805,40 @@ class BackfillJob(BaseJob):
             run.state = State.RUNNING
             run.verify_integrity(session=session)
 
+            # check if we have orphaned tasks
+            self.reset_state_for_orphaned_tasks(dag_run=run, session=session)
+
             # for some reason if we dont refresh the reference to run is lost
             run.refresh_from_db()
             make_transient(run)
             active_dag_runs.append(run)
 
+            for ti in run.get_task_instances():
+                # all tasks part of the backfill are scheduled to run
+                ti.set_state(State.SCHEDULED, session=session)
+                tasks_to_run[ti.key] = ti
+
             next_run_date = self.dag.following_schedule(next_run_date)
 
-        run_count = 0
-        for run in active_dag_runs:
-            logging.info("Checking run {}".format(run))
-            run_count = run_count + 1
-
-            def get_task_instances_for_dag_run(dag_run):
-                # this needs a fresh session sometimes tis get detached
-                # can be more finegrained (excluding success or skipped)
-                tasks = {}
-                for ti in dag_run.get_task_instances():
-                    tasks[ti.key] = ti
-                return tasks
-
-            # Triggering what is ready to get triggered
-            while not deadlocked:
-                tasks_to_run = get_task_instances_for_dag_run(run)
-                self.logger.debug("Clearing out not_ready list")
-                not_ready.clear()
+        finished_runs = 0
+        total_runs = len(active_dag_runs)
+
+        # Triggering what is ready to get triggered
+        while (len(tasks_to_run) > 0 or len(started) > 0) and not deadlocked:
+            self.logger.debug("*** Clearing out not_ready list ***")
+            not_ready.clear()
 
+            # we need to execute the tasks bottom to top
+            # or leaf to root, as otherwise tasks might be
+            # determined deadlocked while they are actually
+            # waiting for their upstream to finish
+            for task in self.dag.topological_sort():
                 for key, ti in list(tasks_to_run.items()):
+                    if task.task_id != ti.task_id:
+                        continue
+
+                    ti.refresh_from_db()
+
                     task = self.dag.get_task(ti.task_id)
                     ti.task = task
 
@@ -1779,6 +1847,7 @@ class BackfillJob(BaseJob):
                         ti.execution_date == (start_date or ti.start_date))
                     self.logger.debug("Task instance to run {} state {}"
                                       .format(ti, ti.state))
+
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
@@ -1786,178 +1855,130 @@ class BackfillJob(BaseJob):
                         self.logger.debug("Task instance {} succeeded. "
                                           "Don't rerun.".format(ti))
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
                         skipped.add(key)
                         self.logger.debug("Task instance {} skipped. "
                                           "Don't rerun.".format(ti))
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
                         self.logger.error("Task instance {} failed".format(ti))
                         failed.add(key)
                         tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
+                        continue
+                    elif ti.state == State.UPSTREAM_FAILED:
+                        self.logger.error("Task instance {} upstream failed".format(ti))
+                        failed.add(key)
+                        tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
                         continue
-
                     backfill_context = DepContext(
                         deps=RUN_DEPS,
                         ignore_depends_on_past=ignore_depends_on_past,
                         ignore_task_deps=self.ignore_task_deps,
                         flag_upstream_failed=True)
+
                     # Is the task runnable? -- then run it
+                    # the dependency checker can change states of tis
                     if ti.are_dependencies_met(
                             dep_context=backfill_context,
                             session=session,
                             verbose=True):
-                        self.logger.debug('Sending {} to executor'.format(ti))
-                        if ti.state == State.NONE:
-                            ti.state = State.SCHEDULED
+                        ti.refresh_from_db(lock_for_update=True, session=session)
+                        if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
+                            # Skip scheduled state, we are executing immediately
+                            ti.state = State.QUEUED
                             session.merge(ti)
+                            self.logger.debug('Sending {} to executor'.format(ti))
+                            executor.queue_task_instance(
+                                ti,
+                                mark_success=self.mark_success,
+                                pickle_id=pickle_id,
+                                ignore_task_deps=self.ignore_task_deps,
+                                ignore_depends_on_past=ignore_depends_on_past,
+                                pool=self.pool)
+                            started[key] = ti
+                            tasks_to_run.pop(key)
                         session.commit()
-                        executor.queue_task_instance(
-                            ti,
-                            mark_success=self.mark_success,
-                            pickle_id=pickle_id,
-                            ignore_task_deps=self.ignore_task_deps,
-                            ignore_depends_on_past=ignore_depends_on_past,
-                            pool=self.pool)
-                        started.add(key)
-
-                    # Mark the task as not ready to run
-                    elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
-                        self.logger.debug('Adding {} to not_ready'.format(ti))
-                        not_ready.add(key)
-
-                    session.commit()
-
-                self.heartbeat()
-                executor.heartbeat()
-
-                # If the set of tasks that aren't ready ever equals the set of
-                # tasks to run, then the backfill is deadlocked
-                if not_ready and not_ready == set(tasks_to_run):
-                    self.logger.warning("Deadlock discovered for tasks_to_run={}"
-                                        .format(tasks_to_run.values()))
-                    deadlocked.update(tasks_to_run.values())
-                    tasks_to_run.clear()
-
-                # Reacting to events
-                for key, state in list(executor.get_event_buffer().items()):
-                    if key not in tasks_to_run:
-                        self.logger.warning("{} state {} not in tasks_to_run={}"
-                                            .format(key, state,
-                                                    tasks_to_run.values()))
                         continue
-                    ti = tasks_to_run[key]
-                    ti.refresh_from_db()
-                    logging.info("Executor state: {} task {}".format(state, ti))
-                    # executor reports failure
-                    if state == State.FAILED:
-
-                        # task reports running
-                        if ti.state == State.RUNNING:
-                            msg = (
-                                'Executor reports that task instance {} failed '
-                                'although the task says it is running.'.format(ti))
-                            self.logger.error(msg)
-                            ti.handle_failure(msg)
-                            tasks_to_run.pop(key)
 
-                        # task reports skipped
-                        elif ti.state == State.SKIPPED:
-                            self.logger.error("Skipping {} ".format(ti))
-                            skipped.add(key)
-                            tasks_to_run.pop(key)
+                    if ti.state == State.UPSTREAM_FAILED:
+                        self.logger.error("Task instance {} upstream failed".format(ti))
+                        failed.add(key)
+                        tasks_to_run.pop(key)
+                        if key in started:
+                            started.pop(key)
+                        continue
 
-                        # anything else is a failure
-                        else:
-                            self.logger.error("Task instance {} failed".format(ti))
-                            failed.add(key)
-                            tasks_to_run.pop(key)
+                    # all remaining tasks
+                    self.logger.debug('Adding {} to not_ready'.format(ti))
+                    not_ready.add(key)
 
-                    # executor reports success
-                    elif state == State.SUCCESS:
+            # execute the tasks in the queue
+            self.heartbeat()
+            executor.heartbeat()
 
-                        # task reports success
-                        if ti.state == State.SUCCESS:
-                            self.logger.info(
-                                'Task instance {} succeeded'.format(ti))
-                            succeeded.add(key)
-                            tasks_to_run.pop(key)
+            # If the set of tasks that aren't ready ever equals the set of
+            # tasks to run and there are no running tasks then the backfill
+            # is deadlocked
+            if not_ready and not_ready == set(tasks_to_run) and len(started) == 0:
+                self.logger.warning("Deadlock discovered for tasks_to_run={}"
+                                    .format(tasks_to_run.values()))
+                deadlocked.update(tasks_to_run.values())
+                tasks_to_run.clear()
 
-                        # task reports failure
-                        elif ti.state == State.FAILED:
-                            self.logger.error("Task instance {} failed".format(ti))
-                            failed.add(key)
-                            tasks_to_run.pop(key)
+            # check executor state
+            self._manage_executor_state(started)
 
-                        # task reports skipped
-                        elif ti.state == State.SKIPPED:
-                            self.logger.info("Task instance {} skipped".format(ti))
-                            skipped.add(key)
-                            tasks_to_run.pop(key)
-
-                        # this probably won't ever be triggered
-                        elif ti in not_ready:
-                            self.logger.info(
-                                "{} wasn't expected to run, but it did".format(ti))
-
-                        # executor reports success but task does not - this is weird
-                        elif ti.state not in (
-                                State.SCHEDULED,
-                                State.QUEUED,
-                                State.UP_FOR_RETRY):
-                            self.logger.error(
-                                "The airflow run command failed "
-                                "at reporting an error. This should not occur "
-                                "in normal circumstances. Task state is '{}',"
-                                "reported state is '{}'. TI is {}"
-                                "".format(ti.state, state, ti))
-
-                            # if the executor fails 3 or more times, stop trying to
-                            # run the task
-                            executor_fails[key] += 1
-                            if executor_fails[key] >= 3:
-                                msg = (
-                                    'The airflow run command failed to report an '
-                                    'error for task {} three or more times. The '
-                                    'task is being marked as failed. This is very '
-                                    'unusual and probably means that an error is '
-                                    'taking place before the task even '
-                                    'starts.'.format(key))
-                                self.logger.error(msg)
-                                ti.handle_failure(msg)
-                                tasks_to_run.pop(key)
-                msg = ' | '.join([
-                    "[backfill progress]",
-                    "dag run {6} of {7}",
-                    "tasks waiting: {0}",
-                    "succeeded: {1}",
-                    "kicked_off: {2}",
-                    "failed: {3}",
-                    "skipped: {4}",
-                    "deadlocked: {5}"
-                ]).format(
-                    len(tasks_to_run),
-                    len(succeeded),
-                    len(started),
-                    len(failed),
-                    len(skipped),
-                    len(deadlocked),
-                    run_count,
-                    len(active_dag_runs))
-                self.logger.info(msg)
-
-                self.logger.debug("Finished dag run loop iteration. "
-                                  "Remaining tasks {}"
-                                  .format(tasks_to_run.values()))
-                if len(tasks_to_run) == 0:
-                    break
+            # update the task counters
+            self._update_counters(started=started, succeeded=succeeded,
+                                  skipped=skipped, failed=failed,
+                                  tasks_to_run=tasks_to_run)
 
             # update dag run state
-            run.update_state(session=session)
-            if run.dag.is_paused:
-                models.DagStat.clean_dirty([run.dag_id], session=session)
+            _dag_runs = active_dag_runs[:]
+            for run in _dag_runs:
+                run.update_state(session=session)
+                if run.state in State.finished():
+                    finished_runs += 1
+                    active_dag_runs.remove(run)
+
+                if run.dag.is_paused:
+                    models.DagStat.clean_dirty([run.dag_id], session=session)
+
+            msg = ' | '.join([
+                "[backfill progress]",
+                "finished run {0} of {1}",
+                "tasks waiting: {2}",
+                "succeeded: {3}",
+                "kicked_off: {4}",
+                "failed: {5}",
+                "skipped: {6}",
+                "deadlocked: {7}",
+                "not ready: {8}"
+            ]).format(
+                finished_runs,
+                total_runs,
+                len(tasks_to_run),
+                len(succeeded),
+                len(started),
+                len(failed),
+                len(skipped),
+                len(deadlocked),
+                len(not_ready))
+            self.logger.info(msg)
+
+            self.logger.debug("Finished dag run loop iteration. "
+                              "Remaining tasks {}"
+                              .format(tasks_to_run.values()))
 
         executor.end()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index b9fda9f..1f1667e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3033,6 +3033,56 @@ class DAG(BaseDag, LoggingMixin):
     def roots(self):
         return [t for t in self.tasks if not t.downstream_list]
 
+    def topological_sort(self):
+        """
+        Sorts tasks in topographical order, such that a task comes after any of its
+        upstream dependencies.
+
+        Heavily inspired by:
+        http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/
+        :returns: list of tasks in topological order
+        """
+
+        # copy the the tasks so we leave it unmodified
+        graph_unsorted = self.tasks[:]
+
+        graph_sorted = []
+
+        # special case
+        if len(self.tasks) == 0:
+            return tuple(graph_sorted)
+
+        # Run until the unsorted graph is empty.
+        while graph_unsorted:
+            # Go through each of the node/edges pairs in the unsorted
+            # graph. If a set of edges doesn't contain any nodes that
+            # haven't been resolved, that is, that are still in the
+            # unsorted graph, remove the pair from the unsorted graph,
+            # and append it to the sorted graph. Note here that by using
+            # using the items() method for iterating, a copy of the
+            # unsorted graph is used, allowing us to modify the unsorted
+            # graph as we move through it. We also keep a flag for
+            # checking that that graph is acyclic, which is true if any
+            # nodes are resolved during each pass through the graph. If
+            # not, we need to bail out as the graph therefore can't be
+            # sorted.
+            acyclic = False
+            for node in list(graph_unsorted):
+                for edge in node.upstream_list:
+                    if edge in graph_unsorted:
+                        break
+                # no edges in upstream tasks
+                else:
+                    acyclic = True
+                    graph_unsorted.remove(node)
+                    graph_sorted.append(node)
+
+            if not acyclic:
+                raise AirflowException("A cyclic dependency occurred in dag: {}"
+                                       .format(self.dag_id))
+
+        return tuple(graph_sorted)
+
     @provide_session
     def set_dag_runs_state(
             self, state=State.RUNNING, session=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
index 2015d9c..9ec6cd4 100644
--- a/tests/executors/test_executor.py
+++ b/tests/executors/test_executor.py
@@ -12,18 +12,41 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+from airflow import settings
 
 
 class TestExecutor(BaseExecutor):
     """
     TestExecutor is used for unit testing purposes.
     """
+    def __init__(self, do_update=False, *args, **kwargs):
+        self.do_update = do_update
+        self._running = []
+        self.history = []
+
+        super(TestExecutor, self).__init__(*args, **kwargs)
+
     def execute_async(self, key, command, queue=None):
         self.logger.debug("{} running task instances".format(len(self.running)))
         self.logger.debug("{} in queue".format(len(self.queued_tasks)))
 
     def heartbeat(self):
-        pass
+        session = settings.Session()
+        if self.do_update:
+            self.history.append(list(self.queued_tasks.values()))
+            while len(self._running) > 0:
+                ti = self._running.pop()
+                ti.set_state(State.SUCCESS, session)
+            for key, val in list(self.queued_tasks.items()):
+                (command, priority, queue, ti) = val
+                ti.set_state(State.RUNNING, session)
+                self._running.append(ti)
+                self.queued_tasks.pop(key)
+
+        session.commit()
+        session.close()
 
     def terminate(self):
         pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d5fdc26..00d7829 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -162,6 +162,54 @@ class BackfillJobTest(unittest.TestCase):
                 ignore_first_depends_on_past=True)
             job.run()
 
+    def test_backfill_ordered_concurrent_execute(self):
+        dag = DAG(
+            dag_id='test_backfill_ordered_concurrent_execute',
+            start_date=DEFAULT_DATE,
+            schedule_interval="@daily")
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3')
+            # order randomly
+            op2.set_downstream(op3)
+            op1.set_downstream(op3)
+            op4.set_downstream(op5)
+            op3.set_downstream(op4)
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          )
+        job.run()
+
+        # test executor history keeps a list
+        history = executor.history
+
+        # check if right order. Every loop has a 'pause' (0) to change state
+        # from RUNNING to SUCCESS.
+        # 6,0,3,0,3,0,3,0 = 8 loops
+        self.assertEqual(8, len(history))
+
+        loop_count = 0
+
+        while len(history) > 0:
+            queued_tasks = history.pop(0)
+            if loop_count == 0:
+                # first loop should contain 6 tasks (3 days x 2 tasks)
+                self.assertEqual(6, len(queued_tasks))
+            if loop_count == 2 or loop_count == 4 or loop_count == 6:
+                # 3 days x 1 task
+                self.assertEqual(3, len(queued_tasks))
+            loop_count += 1
+
     def test_backfill_pooled_tasks(self):
         """
         Test that queued tasks are executed by BackfillJob

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 9da6f16..27c61f0 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -18,6 +18,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import datetime
+import logging
 import os
 import unittest
 import time
@@ -118,6 +119,71 @@ class DagTest(unittest.TestCase):
         self.assertEqual(dag.dag_id, 'creating_dag_in_cm')
         self.assertEqual(dag.tasks[0].task_id, 'op6')
 
+    def test_dag_topological_sort(self):
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B
+        # A -> C -> D
+        # ordered: B, D, C, A or D, B, C, A or D, C, B, A
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op1.set_upstream([op2, op3])
+            op3.set_upstream(op4)
+
+        topological_list = dag.topological_sort()
+        logging.info(topological_list)
+
+        tasks = [op2, op3, op4]
+        self.assertTrue(topological_list[0] in tasks)
+        tasks.remove(topological_list[0])
+        self.assertTrue(topological_list[1] in tasks)
+        tasks.remove(topological_list[1])
+        self.assertTrue(topological_list[2] in tasks)
+        tasks.remove(topological_list[2])
+        self.assertTrue(topological_list[3] == op1)
+
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # C -> (A u B) -> D
+        # C -> E
+        # ordered: E | D, A | B, C
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op5 = DummyOperator(task_id='E')
+            op1.set_downstream(op3)
+            op2.set_downstream(op3)
+            op1.set_upstream(op4)
+            op2.set_upstream(op4)
+            op5.set_downstream(op3)
+
+        topological_list = dag.topological_sort()
+        logging.info(topological_list)
+
+        self.assertTrue(topological_list[0] == op5 or topological_list[0] == op4)
+        self.assertTrue(topological_list[1] == op4 or topological_list[1] == op5)
+        self.assertTrue(topological_list[2] == op1 or topological_list[2] == op2)
+        self.assertTrue(topological_list[3] == op1 or topological_list[3] == op2)
+        self.assertTrue(topological_list[4] == op3)
+
+        dag = DAG(
+            'dag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        self.assertEquals(tuple(), dag.topological_sort())
+
 
 class DagRunTest(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d79ed747/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index eeb41ec..5f049e7 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -102,8 +102,8 @@ class SubDagOperatorTests(unittest.TestCase):
         subdag = dagbag.get_dag('test_subdag_deadlock.subdag')
         subdag.clear()
 
-        # first make sure subdag is deadlocked
-        self.assertRaisesRegexp(AirflowException, 'deadlocked', subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        # first make sure subdag has failed
+        self.assertRaises(AirflowException, subdag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
         # now make sure dag picks up the subdag error
         self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)