You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF subversion and git services (JIRA)" <ji...@apache.org> on 2016/10/28 13:51:59 UTC

[jira] [Commented] (AIRFLOW-585) Fix race condition in backfill execution loop

    [ https://issues.apache.org/jira/browse/AIRFLOW-585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15615468#comment-15615468 ] 

ASF subversion and git services commented on AIRFLOW-585:
---------------------------------------------------------

Commit 97934318bc7d76f262c57e04a36826d0c4547546 in incubator-airflow's branch refs/heads/master from [~vijaysbhat]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9793431 ]

[AIRFLOW-585] Fix race condition in backfill execution loop

A subtle race condition in the backfill execution
loop gives
rise to occasional deadlocks, causing Travis CI
builds to
randomly fail. The root cause is unsynchronized
access to
individual task instance states for a DAG run in
the execution
inner loop.

The fix involves atomically reading the state of
all task
instances for a DAG run once at the beginning of
every
iteration of the inner loop.

Closes #1846 from vijaysbhat/travis-ci-debugging


> Fix race condition in backfill execution loop
> ---------------------------------------------
>
>                 Key: AIRFLOW-585
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-585
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor, travis
>         Environment: LocalExecutor, CeleryExecutor
>            Reporter: Vijay Bhat
>            Assignee: Vijay Bhat
>
> I found this bug while investigating random Travis build failures. The root cause is a subtle race condition in the backfill execution loop that creates deadlocks every now and then.
> *Analysis:*
> The common pattern I was seeing in the random build failures was a deadlock in an arbitrary backfill job. Example below:
> ========================================================
> ERROR: test_backfill_multi_dates (tests.BackfillJobTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 100, in test_backfill_multi_dates
>     job.run()
>   File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 194, in run
>     self._execute()
>   File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 1894, in _execute
>     raise AirflowException(err)
> nose.proxy.AirflowException: ---------------------------------------------------
> BackfillJob is deadlocked. These tasks were unable to run:
> {<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00 [None]>, <TaskInstance: example_bash_operator.run_this_last 2016-01-02 00:00:00 [None]>}
> After digging into the backfill execution code and adding lots of logging, I found a race condition vulnerability in the main backfill execution loop (BackfillJob._execute) for a DAG run:
>            \# Triggering what is ready to get triggered
>             while tasks_to_run and not deadlocked:
>                 not_ready.clear()
>                 for key, ti in list(tasks_to_run.items()):
>                     ti.refresh_from_db(session=session, lock_for_update=True)
>              ...
>              ...
>             \# update dag run state
>             run.update_state(session=session)
>             if run.dag.is_paused:
>                 models.DagStat.clean_dirty([run.dag_id], session=session)
> The problem is that the state of all task instances for a DAG run is not read atomically in the loop, but can be refreshed piecemeal (highlighted in bold above) as the loop executes. In a multiprocessing scenario (like LocalExecutor), this leaves the door open to spuriously detecting a deadlock state and failing.
> Here's an example sequence of events that can cause this failure. Let's say we have a DAG with tasks A and B, with B dependent on A (A -> B) and A has been picked up by a worker (but not completed), which means B is not ready to run. The backfill / local executor process is actively running.
> 1. Let tasks_to_run be read as [B, A] in BackfillJob._execute
> 2. In the while loop, B is inspected first, and it's correctly identified as not runnable (since A hasn't succeeded yet). B is added to not_ready. Now, not_ready = [B]
> 3. The backfill / local executor process gets interrupted and control is given to the worker process, which then runs task A and marks it as complete in the DB (in the TaskInstance run method).
> 4. Control is given back to the backfill / local executor process that goes on to inspect task A. It calls ti.refresh_from_db, and finds A is complete, so it pops it off the tasks_to_run list. Now, tasks_to_run = [B]
> 5. The following code segment in the loop incorrectly marks the DAG run as deadlocked and the backfill job is marked failed: 
>                 \# If the set of tasks that aren't ready ever equals the set of
>                 \# tasks to run, then the backfill is deadlocked
>                 if not_ready and not_ready == set(tasks_to_run):
>                     deadlocked.update(tasks_to_run.values())
>                     tasks_to_run.clear()
> *How to fix:*
> The main reason for the race condition is that we are not synchronizing access to the task instances in a DAG run. 
> There are 3+n actors in the backfill system:
> * Backfill loop
> * LocalExecutor object
> * Metastore DB
> * n LocalWorker processes
> The backfill loop and local executor run in the same process, so we don't have to worry about synchronization between them. But we need to synchronize access between the other actors. The channels of communication in this context are:
> * Backfill loop <-> LocalExecutor = event_buffer
> * Backfill loop <-> Metastore = SQL Alchemy ORM
> * Metastore <-> LocalWorker = SQL Alchemy ORM
> Which means the backfill loop has two versions of the task instance state, one from the LocalExecutor event buffer (which gets updated when the worker completes a task) and another from the metastore (which the worker also writes to).
> If we consider the metastore to be the source of truth, we can synchronize access by reading the state of all task instances for the DAG run in a single query before the "for key, ti in list(tasks_to_run.items())" loop and removing individual task instance refreshes inside the loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)