You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Bolke de Bruin <bo...@xs4all.nl> on 2016/05/23 11:55:10 UTC

Refactoring process_dag

Hi,

Working on the roadmap of the scheduler I have prepared a new PR (https://github.com/apache/incubator-airflow/pull/1514 <https://github.com/apache/incubator-airflow/pull/1514>) that is ready for review.

Goals:

* Improve readability of the code and generic assumptions (getters should not change a state) for DagRuns
* Improve robustness and lower risk of race conditions in the scheduler
* Reduce amount of calls to the database, limit connections in the scheduler
* Identify speed optimizations possibilities

What has changed:
* Two new TaskInstance states have been introduced. "REMOVED" and "SCHEDULED". REMOVED will be set when taskinstances are encountered that do no exist anymore in the DAG. This happens when a DAG is changed (ie. a new version). The "REMOVED" state exists for lineage purposes. "SCHEDULED" is used when a Task that did not have a state before is sent to the executor. It is used by both the scheduler and backfills. This state almost removes the race condition that exists if using multiple schedulers: due to the fact UP_FOR_RETRY is being managed by the TaskInstance (I think that is the wrong place) is still exists for that state.
* get_active_runs was a getter that was also updating to the database. This patch refactors get_active_runs into two different functions that are now part of DagRun. 1) update_state updates the state of the dagrun based on the taskinstances of the dagrun. 2) verify_integrity checks and updates the dag run based on if the dag contains new or missing tasks. 
* DagRun.update_state has been updated to not call the database twice for the same functions. This reduces the time spent here by 50% in certain occasions when having many tasks in a Dag that need to be evaluated. Still this needs to be faster: for those Dags with many tasks the aggregation query in TaskInstance.are_dependencies_met is very expensive. It should be refactored.
* process_dag has been updated to use the functions and cleaned up, making it much more readable. Tasks are now properly locked by the database. I have played with multiprocessing here (on dagruns and taskinstances) but left it out for now. Fixing the above will help more I think.

More info in the PR itself.

please let me know what you think!

- Bolke

Re: Refactoring process_dag

Posted by Chris Riccomini <cr...@apache.org>.
Hey Bolke,

Thanks for writing this up.

> due to the fact UP_FOR_RETRY is being managed by the TaskInstance (I
think that is the wrong place) is still exists for that state.

Curious--is it on the roadmap to fix this as well? Also, can you describe
the impact of the race condition?

Cheers,
Chris

On Mon, May 23, 2016 at 4:55 AM, Bolke de Bruin <bo...@xs4all.nl> wrote:

> Hi,
>
> Working on the roadmap of the scheduler I have prepared a new PR (
> https://github.com/apache/incubator-airflow/pull/1514 <
> https://github.com/apache/incubator-airflow/pull/1514>) that is ready for
> review.
>
> Goals:
>
> * Improve readability of the code and generic assumptions (getters should
> not change a state) for DagRuns
> * Improve robustness and lower risk of race conditions in the scheduler
> * Reduce amount of calls to the database, limit connections in the
> scheduler
> * Identify speed optimizations possibilities
>
> What has changed:
> * Two new TaskInstance states have been introduced. "REMOVED" and
> "SCHEDULED". REMOVED will be set when taskinstances are encountered that do
> no exist anymore in the DAG. This happens when a DAG is changed (ie. a new
> version). The "REMOVED" state exists for lineage purposes. "SCHEDULED" is
> used when a Task that did not have a state before is sent to the executor.
> It is used by both the scheduler and backfills. This state almost removes
> the race condition that exists if using multiple schedulers: due to the
> fact UP_FOR_RETRY is being managed by the TaskInstance (I think that is the
> wrong place) is still exists for that state.
> * get_active_runs was a getter that was also updating to the database.
> This patch refactors get_active_runs into two different functions that are
> now part of DagRun. 1) update_state updates the state of the dagrun based
> on the taskinstances of the dagrun. 2) verify_integrity checks and updates
> the dag run based on if the dag contains new or missing tasks.
> * DagRun.update_state has been updated to not call the database twice for
> the same functions. This reduces the time spent here by 50% in certain
> occasions when having many tasks in a Dag that need to be evaluated. Still
> this needs to be faster: for those Dags with many tasks the aggregation
> query in TaskInstance.are_dependencies_met is very expensive. It should be
> refactored.
> * process_dag has been updated to use the functions and cleaned up, making
> it much more readable. Tasks are now properly locked by the database. I
> have played with multiprocessing here (on dagruns and taskinstances) but
> left it out for now. Fixing the above will help more I think.
>
> More info in the PR itself.
>
> please let me know what you think!
>
> - Bolke