You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/10/28 13:51:00 UTC

incubator-airflow git commit: [AIRFLOW-585] Fix race condition in backfill execution loop

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4a496fa92 -> 97934318b


[AIRFLOW-585] Fix race condition in backfill execution loop

A subtle race condition in the backfill execution
loop gives
rise to occasional deadlocks, causing Travis CI
builds to
randomly fail. The root cause is unsynchronized
access to
individual task instance states for a DAG run in
the execution
inner loop.

The fix involves atomically reading the state of
all task
instances for a DAG run once at the beginning of
every
iteration of the inner loop.

Closes #1846 from vijaysbhat/travis-ci-debugging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/97934318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/97934318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/97934318

Branch: refs/heads/master
Commit: 97934318bc7d76f262c57e04a36826d0c4547546
Parents: 4a496fa
Author: Vijay Bhat <vi...@example.com>
Authored: Fri Oct 28 15:50:25 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Oct 28 15:50:38 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py                          | 60 ++++++++++++++++++++-------
 airflow/models.py                        | 14 ++++++-
 airflow/ti_deps/deps/trigger_rule_dep.py | 42 ++++++++++++-------
 3 files changed, 85 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/97934318/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 77bb1c4..972f597 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1679,34 +1679,48 @@ class BackfillJob(BaseJob):
         for run in active_dag_runs:
             logging.info("Checking run {}".format(run))
             run_count = run_count + 1
-            # this needs a fresh session sometimes tis get detached
-            # can be more finegrained (excluding success or skipped)
-            for ti in run.get_task_instances():
-                tasks_to_run[ti.key] = ti
+
+            def get_task_instances_for_dag_run(dag_run):
+                # this needs a fresh session sometimes tis get detached
+                # can be more finegrained (excluding success or skipped)
+                tasks = {}
+                for ti in dag_run.get_task_instances():
+                    tasks[ti.key] = ti
+                return tasks
 
             # Triggering what is ready to get triggered
-            while tasks_to_run and not deadlocked:
+            while not deadlocked:
+                tasks_to_run = get_task_instances_for_dag_run(run)
+                self.logger.debug("Clearing out not_ready list")
                 not_ready.clear()
 
                 for key, ti in list(tasks_to_run.items()):
-                    ti.refresh_from_db(session=session, lock_for_update=True)
                     task = self.dag.get_task(ti.task_id)
                     ti.task = task
 
                     ignore_depends_on_past = (
                         self.ignore_first_depends_on_past and
                         ti.execution_date == (start_date or ti.start_date))
+                    self.logger.debug("Task instance to run {} state {}"
+                                      .format(ti, ti.state))
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
                         succeeded.add(key)
+                        self.logger.debug("Task instance {} succeeded. "
+                                          "Don't rerun.".format(ti))
                         tasks_to_run.pop(key)
-                        session.commit()
                         continue
                     elif ti.state == State.SKIPPED:
                         skipped.add(key)
+                        self.logger.debug("Task instance {} skipped. "
+                                          "Don't rerun.".format(ti))
+                        tasks_to_run.pop(key)
+                        continue
+                    elif ti.state == State.FAILED:
+                        self.logger.error("Task instance {} failed".format(ti))
+                        failed.add(key)
                         tasks_to_run.pop(key)
-                        session.commit()
                         continue
 
                     backfill_context = DepContext(
@@ -1735,6 +1749,7 @@ class BackfillJob(BaseJob):
 
                     # Mark the task as not ready to run
                     elif ti.state in (State.NONE, State.UPSTREAM_FAILED):
+                        self.logger.debug('Adding {} to not_ready'.format(ti))
                         not_ready.add(key)
 
                     session.commit()
@@ -1745,12 +1760,17 @@ class BackfillJob(BaseJob):
                 # If the set of tasks that aren't ready ever equals the set of
                 # tasks to run, then the backfill is deadlocked
                 if not_ready and not_ready == set(tasks_to_run):
+                    self.logger.warn("Deadlock discovered for tasks_to_run={}"
+                                     .format(tasks_to_run.values()))
                     deadlocked.update(tasks_to_run.values())
                     tasks_to_run.clear()
 
                 # Reacting to events
                 for key, state in list(executor.get_event_buffer().items()):
                     if key not in tasks_to_run:
+                        self.logger.warn("{} state {} not in tasks_to_run={}"
+                                         .format(key, state,
+                                                 tasks_to_run.values()))
                         continue
                     ti = tasks_to_run[key]
                     ti.refresh_from_db()
@@ -1762,20 +1782,20 @@ class BackfillJob(BaseJob):
                         if ti.state == State.RUNNING:
                             msg = (
                                 'Executor reports that task instance {} failed '
-                                'although the task says it is running.'.format(key))
+                                'although the task says it is running.'.format(ti))
                             self.logger.error(msg)
                             ti.handle_failure(msg)
                             tasks_to_run.pop(key)
 
                         # task reports skipped
                         elif ti.state == State.SKIPPED:
-                            self.logger.error("Skipping {} ".format(key))
+                            self.logger.error("Skipping {} ".format(ti))
                             skipped.add(key)
                             tasks_to_run.pop(key)
 
                         # anything else is a failure
                         else:
-                            self.logger.error("Task instance {} failed".format(key))
+                            self.logger.error("Task instance {} failed".format(ti))
                             failed.add(key)
                             tasks_to_run.pop(key)
 
@@ -1785,19 +1805,19 @@ class BackfillJob(BaseJob):
                         # task reports success
                         if ti.state == State.SUCCESS:
                             self.logger.info(
-                                'Task instance {} succeeded'.format(key))
+                                'Task instance {} succeeded'.format(ti))
                             succeeded.add(key)
                             tasks_to_run.pop(key)
 
                         # task reports failure
                         elif ti.state == State.FAILED:
-                            self.logger.error("Task instance {} failed".format(key))
+                            self.logger.error("Task instance {} failed".format(ti))
                             failed.add(key)
                             tasks_to_run.pop(key)
 
                         # task reports skipped
                         elif ti.state == State.SKIPPED:
-                            self.logger.info("Task instance {} skipped".format(key))
+                            self.logger.info("Task instance {} skipped".format(ti))
                             skipped.add(key)
                             tasks_to_run.pop(key)
 
@@ -1853,6 +1873,12 @@ class BackfillJob(BaseJob):
                     len(active_dag_runs))
                 self.logger.info(msg)
 
+                self.logger.debug("Finished dag run loop iteration. "
+                                  "Remaining tasks {}"
+                                  .format(tasks_to_run.values()))
+                if len(tasks_to_run) == 0:
+                    break
+
             # update dag run state
             run.update_state(session=session)
             if run.dag.is_paused:
@@ -1889,7 +1915,11 @@ class BackfillJob(BaseJob):
                     'backfill with the option '
                     '"ignore_first_depends_on_past=True" or passing "-I" at '
                     'the command line.')
-            err += ' These tasks were unable to run:\n{}\n'.format(deadlocked)
+            err += ' These tasks have succeeded:\n{}\n'.format(succeeded)
+            err += ' These tasks have started:\n{}\n'.format(started)
+            err += ' These tasks have failed:\n{}\n'.format(failed)
+            err += ' These tasks are skipped:\n{}\n'.format(skipped)
+            err += ' These tasks are deadlocked:\n{}\n'.format(deadlocked)
         if err:
             raise AirflowException(err)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/97934318/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 44d9647..e0a2cda 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1195,6 +1195,7 @@ class TaskInstance(Base):
                 dep_context=queue_dep_context,
                 session=session,
                 verbose=True):
+            session.commit()
             return
 
         self.clear_xcom_data()
@@ -1230,9 +1231,18 @@ class TaskInstance(Base):
                 logging.info(hr + msg + hr)
 
                 self.queued_dttm = datetime.now()
+                msg = "Queuing into pool {}".format(self.pool)
+                logging.info(msg)
                 session.merge(self)
-                session.commit()
-                logging.info("Queuing into pool {}".format(self.pool))
+            session.commit()
+            return
+
+        # Another worker might have started running this task instance while
+        # the current worker process was blocked on refresh_from_db
+        if self.state == State.RUNNING:
+            msg = "Task Instance already running {}".format(self)
+            logging.warn(msg)
+            session.commit()
             return
 
         # print status message

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/97934318/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 9a2e78d..281ed51 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -123,7 +123,10 @@ class TriggerRuleDep(BaseTIDep):
         upstream = len(task.upstream_task_ids)
         tr = task.trigger_rule
         upstream_done = done >= upstream
-
+        upstream_tasks_state = {
+            "successes": successes, "skipped": skipped, "failed": failed,
+            "upstream_failed": upstream_failed, "done": done
+        }
         # TODO(aoen): Ideally each individual trigger rules would be it's own class, but
         # this isn't very feasible at the moment since the database queries need to be
         # bundled together for efficiency.
@@ -147,33 +150,44 @@ class TriggerRuleDep(BaseTIDep):
         if tr == TR.ONE_SUCCESS:
             if successes <= 0:
                 yield self._failing_status(
-                    reason="Task's trigger rule '{0}' requires one upstream task "
-                           "success, but none were found.".format(tr))
+                    reason="Task's trigger rule '{0}' requires one upstream "
+                    "task success, but none were found. "
+                    "upstream_tasks_state={1}, upstream_task_ids={2}"
+                    .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ONE_FAILED:
             if not failed and not upstream_failed:
                 yield self._failing_status(
-                    reason="Task's trigger rule '{0}' requires one upstream task failure "
-                           ", but none were found.".format(tr))
+                    reason="Task's trigger rule '{0}' requires one upstream "
+                    "task failure, but none were found. "
+                    "upstream_tasks_state={1}, upstream_task_ids={2}"
+                    .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ALL_SUCCESS:
             num_failures = upstream - successes
             if num_failures > 0:
                 yield self._failing_status(
-                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
-                           "have succeeded, but found {1} non-success(es)."
-                           .format(tr, num_failures))
+                    reason="Task's trigger rule '{0}' requires all upstream "
+                    "tasks to have succeeded, but found {1} non-success(es). "
+                    "upstream_tasks_state={2}, upstream_task_ids={3}"
+                    .format(tr, num_failures, upstream_tasks_state,
+                            task.upstream_task_ids))
         elif tr == TR.ALL_FAILED:
             num_successes = upstream - failed - upstream_failed
             if num_successes > 0:
                 yield self._failing_status(
-                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
-                           "have failed, but found {1} non-faliure(s)."
-                           .format(tr, num_successes))
+                    reason="Task's trigger rule '{0}' requires all upstream "
+                    "tasks to have failed, but found {1} non-failure(s). "
+                    "upstream_tasks_state={2}, upstream_task_ids={3}"
+                    .format(tr, num_successes, upstream_tasks_state,
+                            task.upstream_task_ids))
         elif tr == TR.ALL_DONE:
             if not upstream_done:
                 yield self._failing_status(
-                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
-                           "have completed, but found '{1}' task(s) that weren't done."
-                           .format(tr, upstream - done))
+                    reason="Task's trigger rule '{0}' requires all upstream "
+                    "tasks to have completed, but found {1} task(s) that "
+                    "weren't done. upstream_tasks_state={2}, "
+                    "upstream_task_ids={3}"
+                    .format(tr, upstream-done, upstream_tasks_state,
+                            task.upstream_task_ids))
         else:
             yield self._failing_status(
                 reason="No strategy to evaluate trigger rule '{0}'.".format(tr))