You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jeremiah Lowin (JIRA)" <ji...@apache.org> on 2016/04/28 17:23:12 UTC

[jira] [Comment Edited] (AIRFLOW-14) DagRun Refactor (Scheduler 2.0)

    [ https://issues.apache.org/jira/browse/AIRFLOW-14?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15262324#comment-15262324 ] 

Jeremiah Lowin edited comment on AIRFLOW-14 at 4/28/16 3:22 PM:
----------------------------------------------------------------

DagRuns are primary keyed by (dag_id, execution_date), so there is only one canonical version. DRJ needs to make sure to refresh from the db to check for a lock immediately before running it, however. The mechanism is very similar to TaskInstance -- you can create as many TI objects as you want, but they all point at the one canonical version and can be refreshed at any time to reflect the "true" state.

DRJs pick up DagRuns in two ways:
1. explicitly via {{DagRunJob.submit_dags()}}. This is used for example by BackfillJob; it generates a bunch of DagRuns and calls {{submit_dags()}} to submit them to itself. Then it enters its loop. Scheduler also uses this after scheduling a DagRun, though it's actually redundant because of the second way (below)
2. automatically via {{DagRunJob.collect_dags()}}. This is called inside each DRJ loop and is used by SchedulerJob to look for any active DagRuns and add them to its set of DagRuns to execute.


was (Author: jlowin):
DagRuns are primary keyed by (dag_id, execution_date), so there is only one canonical version. DRJ needs to make sure to refresh from the db to check for a lock immediately before running it, however. The mechanism is very similar to TaskInstance -- you can create as many TI objects as you want, but they all point at the one canonical version and can be refreshed at any time to reflect the "true" state.

DRJs pick up DagRuns in two ways:
1. explicitly via {{DagRunJob.submit_dags()}}. This is used for example by BackfillJob; it generates a bunch of DagRuns and calls {{submit_dags()}} to submit them to itself. Then it enters its loop. Scheduler also uses this after scheduling a DagRun, though it's actually redundant because of the second way (below)
2. automatically via {{DagRunJob.collect_dags()}}. This is used by SchedulerJob to look for any active DagRuns and add them to its list.

> DagRun Refactor (Scheduler 2.0)
> -------------------------------
>
>                 Key: AIRFLOW-14
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-14
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: Jeremiah Lowin
>            Assignee: Jeremiah Lowin
>              Labels: backfill, dagrun, scheduler
>
> For full proposal, please see the Wiki: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62694286
> Borrowing from that page: 
> *Description of New Workflow*
> DagRuns represent the state of a DAG at a certain point in time (perhaps they should be called DagInstances?). To run a DAG – or to manage the execution of a DAG – a DagRun must first be created. This can be done manually (simply by creating a DagRun object) or automatically, using methods like dag.schedule_dag(). Therefore, both scheduling new runs OR introducing ad-hoc runs can be done by any process at any time, simply by creating the appropriate object.
> Just creating a DagRun is not enough to actually run the DAG (just as creating a TaskInstance is not the same as actually running a task). We need a Job for that. The DagRunJob is fairly simple in structure. It maintains a set of DagRuns that it is tasked with executing, and loops over that set until all the DagRuns either succeed or fail. New DagRuns can be passed to the job explicitly via DagRunJob.submit_dagruns() or by defining its DagRunJob.collect_dagruns() method, which is called during each loop. When the DagRunJob is executing a specific DagRun, it locks it. Other DagRunJobs will not try to execute locked DagRuns. This way, many DagRunJobs can run simultaneously in either a local or distributed setting, and can even be pointed at the same DagRuns, without worrying about collisions or interference.
> The basic DagRunJob loop works like this:
> - refresh dags
> - collect new dagruns
> - process dagruns (including updating dagrun states for success/failure)
> - call executor/own heartbeat
> By tweaking the DagRunJob, we can easily recreate the behavior of the current SchedulerJob and BackfillJob. The Scheduler simply runs forever and picks up ALL active DagRuns in collect_dagruns(); Backfill generates DagRuns corresponding to the requested start/end dates and submits them to itself prior to initiating its loop.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)