You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2016/08/11 22:59:45 UTC

[6/7] incubator-airflow-site git commit: New version of the docs

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/4af0850c/_modules/airflow/models.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/models.html b/_modules/airflow/models.html
index d4ed8a6..2f044a4 100644
--- a/_modules/airflow/models.html
+++ b/_modules/airflow/models.html
@@ -198,6 +198,7 @@
 <span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="kn">import</span> <span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span> <span class="n">LocalExecutor</span>
 <span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">configuration</span>
 <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="kn">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSkipException</span>
+<span class="kn">from</span> <span class="nn">airflow.dag.base_dag</span> <span class="kn">import</span> <span class="n">BaseDag</span><span class="p">,</span> <span class="n">BaseDagBag</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="kn">import</span> <span class="n">cron_presets</span><span class="p">,</span> <span class="n">date_range</span> <span class="k">as</span> <span class="n">utils_date_range</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="kn">import</span> <span class="n">provide_session</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span>
@@ -205,6 +206,7 @@
 <span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="kn">import</span> <span class="p">(</span>
     <span class="n">as_tuple</span><span class="p">,</span> <span class="n">is_container</span><span class="p">,</span> <span class="n">is_in</span><span class="p">,</span> <span class="n">validate_key</span><span class="p">,</span> <span class="n">pprinttable</span><span class="p">)</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.logging</span> <span class="kn">import</span> <span class="n">LoggingMixin</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="kn">import</span> <span class="n">Resources</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="kn">import</span> <span class="n">State</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="kn">import</span> <span class="n">timeout</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="kn">import</span> <span class="n">TriggerRule</span>
@@ -267,7 +269,7 @@
             <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
 
 
-<div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
+<div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">BaseDagBag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    A dagbag is a collection of dags, parsed out of a folder tree and has high</span>
 <span class="sd">    level configuration settings, like what database to use as a backend and</span>
@@ -278,7 +280,7 @@
 <span class="sd">    independent settings sets.</span>
 
 <span class="sd">    :param dag_folder: the folder to scan to find DAGs</span>
-<span class="sd">    :type dag_folder: str</span>
+<span class="sd">    :type dag_folder: unicode</span>
 <span class="sd">    :param executor: the executor to use when executing task instances</span>
 <span class="sd">        in this DagBag</span>
 <span class="sd">    :param include_examples: whether to include the examples that ship</span>
@@ -293,25 +295,23 @@
             <span class="bp">self</span><span class="p">,</span>
             <span class="n">dag_folder</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">executor</span><span class="o">=</span><span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span>
-            <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;LOAD_EXAMPLES&#39;</span><span class="p">),</span>
-            <span class="n">sync_to_db</span><span class="o">=</span><span class="bp">False</span><span class="p">):</span>
+            <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;LOAD_EXAMPLES&#39;</span><span class="p">)):</span>
 
         <span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="n">DAGS_FOLDER</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Filling up the DagBag from {}&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">))</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="o">=</span> <span class="p">{}</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">sync_to_db</span> <span class="o">=</span> <span class="n">sync_to_db</span>
+        <span class="c1"># the file&#39;s last modified timestamp when we last read it</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span> <span class="o">=</span> <span class="p">{}</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">executor</span> <span class="o">=</span> <span class="n">executor</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span> <span class="o">=</span> <span class="p">{}</span>
+
         <span class="k">if</span> <span class="n">include_examples</span><span class="p">:</span>
             <span class="n">example_dag_folder</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
                 <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">__file__</span><span class="p">),</span>
                 <span class="s1">&#39;example_dags&#39;</span><span class="p">)</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">example_dag_folder</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">)</span>
-        <span class="k">if</span> <span class="n">sync_to_db</span><span class="p">:</span>
-            <span class="bp">self</span><span class="o">.</span><span class="n">deactivate_inactive_dags</span><span class="p">()</span>
 
 <div class="viewcode-block" id="DagBag.size"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.size">[docs]</a>    <span class="k">def</span> <span class="nf">size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
@@ -330,7 +330,7 @@
             <span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span><span class="p">:</span>
                 <span class="n">root_dag_id</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">parent_dag</span><span class="o">.</span><span class="n">dag_id</span>
 
-        <span class="c1"># If the root_dag_id is absent or expired</span>
+        <span class="c1"># If the dag corresponding to root_dag_id is absent or expired</span>
         <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="o">.</span><span class="n">get_current</span><span class="p">(</span><span class="n">root_dag_id</span><span class="p">)</span>
         <span class="k">if</span> <span class="n">orm_dag</span> <span class="ow">and</span> <span class="p">(</span>
                 <span class="n">root_dag_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="ow">or</span>
@@ -339,10 +339,11 @@
                     <span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">&lt;</span> <span class="n">orm_dag</span><span class="o">.</span><span class="n">last_expired</span>
                 <span class="p">)</span>
         <span class="p">):</span>
-            <span class="c1"># Reprocessing source file</span>
+            <span class="c1"># Reprocess source file</span>
             <span class="n">found_dags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_file</span><span class="p">(</span>
                 <span class="n">filepath</span><span class="o">=</span><span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span><span class="p">,</span> <span class="n">only_if_updated</span><span class="o">=</span><span class="bp">False</span><span class="p">)</span>
 
+            <span class="c1"># If the source file no longer exports `dag_id`, delete it from self.dags</span>
             <span class="k">if</span> <span class="n">found_dags</span> <span class="ow">and</span> <span class="n">dag_id</span> <span class="ow">in</span> <span class="p">[</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">found_dags</span><span class="p">]:</span>
                 <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">[</span><span class="n">dag_id</span><span class="p">]</span>
             <span class="k">elif</span> <span class="n">dag_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">:</span>
@@ -363,10 +364,10 @@
         <span class="k">try</span><span class="p">:</span>
             <span class="c1"># This failed before in what may have been a git sync</span>
             <span class="c1"># race condition</span>
-            <span class="n">dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromtimestamp</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">getmtime</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span>
+            <span class="n">file_last_changed_on_disk</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromtimestamp</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">getmtime</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span>
             <span class="k">if</span> <span class="n">only_if_updated</span> \
                     <span class="ow">and</span> <span class="n">filepath</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span> \
-                    <span class="ow">and</span> <span class="n">dttm</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]:</span>
+                    <span class="ow">and</span> <span class="n">file_last_changed_on_disk</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]:</span>
                 <span class="k">return</span> <span class="n">found_dags</span>
 
         <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
@@ -395,7 +396,7 @@
                 <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
                     <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: &quot;</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
                     <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
-                    <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+                    <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
 
         <span class="k">else</span><span class="p">:</span>
             <span class="n">zip_file</span> <span class="o">=</span> <span class="n">zipfile</span><span class="o">.</span><span class="n">ZipFile</span><span class="p">(</span><span class="n">filepath</span><span class="p">)</span>
@@ -426,7 +427,7 @@
                     <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
                         <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: &quot;</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
                         <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
-                        <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+                        <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
 
         <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">mods</span><span class="p">:</span>
             <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
@@ -439,11 +440,11 @@
                     <span class="n">found_dags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">dag</span><span class="p">)</span>
                     <span class="n">found_dags</span> <span class="o">+=</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span>
 
-        <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
         <span class="k">return</span> <span class="n">found_dags</span></div>
 
     <span class="nd">@provide_session</span>
-<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a>    <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
+<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a>    <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">        Fails tasks that haven&#39;t had a heartbeat in too long</span>
 <span class="sd">        &quot;&quot;&quot;</span>
@@ -477,6 +478,7 @@
                     <span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">&quot;{} killed as zombie&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
                     <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
                         <span class="s1">&#39;Marked zombie job {} as failed&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
+                    <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;zombies_killed&#39;</span><span class="p">)</span>
         <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
 
 <div class="viewcode-block" id="DagBag.bag_dag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.bag_dag">[docs]</a>    <span class="k">def</span> <span class="nf">bag_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dag</span><span class="p">,</span> <span class="n">parent_dag</span><span class="p">,</span> <span class="n">root_dag</span><span class="p">):</span>
@@ -490,20 +492,6 @@
         <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
             <span class="n">settings</span><span class="o">.</span><span class="n">policy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span>
 
-        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">sync_to_db</span><span class="p">:</span>
-            <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
-            <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
-                <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
-            <span class="k">if</span> <span class="ow">not</span> <span class="n">orm_dag</span><span class="p">:</span>
-                <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
-            <span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="n">root_dag</span><span class="o">.</span><span class="n">full_filepath</span>
-            <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span>
-            <span class="n">orm_dag</span><span class="o">.</span><span class="n">owners</span> <span class="o">=</span> <span class="n">root_dag</span><span class="o">.</span><span class="n">owner</span>
-            <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="bp">True</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">orm_dag</span><span class="p">)</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
-
         <span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span><span class="p">:</span>
             <span class="n">subdag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
             <span class="n">subdag</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="n">dag</span>
@@ -692,7 +680,8 @@
         <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span><span class="p">:</span>
             <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
                 <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
-                    <span class="s2">&quot;Can&#39;t decrypt, configuration is missing&quot;</span><span class="p">)</span>
+                    <span class="s2">&quot;Can&#39;t decrypt encrypted password for login={}, </span><span class="se">\</span>
+<span class="s2">                    FERNET_KEY configuration is missing&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
             <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
         <span class="k">else</span><span class="p">:</span>
             <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span>
@@ -715,7 +704,8 @@
         <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span><span class="p">:</span>
             <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
                 <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
-                    <span class="s2">&quot;Can&#39;t decrypt `extra`, configuration is missing&quot;</span><span class="p">)</span>
+                    <span class="s2">&quot;Can&#39;t decrypt `extra` params for login={},</span><span class="se">\</span>
+<span class="s2">                    FERNET_KEY configuration is missing&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
             <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
         <span class="k">else</span><span class="p">:</span>
             <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span>
@@ -891,8 +881,78 @@
 <span class="sd">        the orchestrator.</span>
 <span class="sd">        &quot;&quot;&quot;</span>
         <span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span>
-        <span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
-        <span class="n">cmd</span> <span class="o">=</span> <span class="s2">&quot;airflow run {self.dag_id} {self.task_id} {iso} &quot;</span>
+
+        <span class="n">should_pass_filepath</span> <span class="o">=</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span>
+        <span class="k">if</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span>
+            <span class="n">path</span> <span class="o">=</span> <span class="s2">&quot;DAGS_FOLDER/{}&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span>
+        <span class="k">elif</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
+            <span class="n">path</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
+        <span class="k">else</span><span class="p">:</span>
+            <span class="n">path</span> <span class="o">=</span> <span class="bp">None</span>
+
+        <span class="k">return</span> <span class="n">TaskInstance</span><span class="o">.</span><span class="n">generate_command</span><span class="p">(</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
+            <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
+            <span class="n">ignore_dependencies</span><span class="o">=</span><span class="n">ignore_dependencies</span><span class="p">,</span>
+            <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
+            <span class="n">force</span><span class="o">=</span><span class="n">force</span><span class="p">,</span>
+            <span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span>
+            <span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span>
+            <span class="n">file_path</span><span class="o">=</span><span class="n">path</span><span class="p">,</span>
+            <span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span>
+            <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
+            <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">)</span></div>
+
+    <span class="nd">@staticmethod</span>
+<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a>    <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span>
+                         <span class="n">task_id</span><span class="p">,</span>
+                         <span class="n">execution_date</span><span class="p">,</span>
+                         <span class="n">mark_success</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">ignore_dependencies</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">force</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">local</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">pickle_id</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+                         <span class="n">file_path</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+                         <span class="n">raw</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+                         <span class="n">job_id</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+                         <span class="n">pool</span><span class="o">=</span><span class="bp">None</span>
+                         <span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Generates the shell command required to execute this task instance.</span>
+
+<span class="sd">        :param dag_id: DAG ID</span>
+<span class="sd">        :type dag_id: unicode</span>
+<span class="sd">        :param task_id: Task ID</span>
+<span class="sd">        :type task_id: unicode</span>
+<span class="sd">        :param execution_date: Execution date for the task</span>
+<span class="sd">        :type execution_date: datetime</span>
+<span class="sd">        :param mark_success: Whether to mark the task as successful</span>
+<span class="sd">        :type mark_success: bool</span>
+<span class="sd">        :param ignore_dependencies: Whether to ignore the dependencies and run</span>
+<span class="sd">        anyway</span>
+<span class="sd">        :type ignore_dependencies: bool</span>
+<span class="sd">        :param ignore_depends_on_past: Whether to ignore the depends on past</span>
+<span class="sd">        setting and run anyway</span>
+<span class="sd">        :type ignore_depends_on_past: bool</span>
+<span class="sd">        :param force: Whether to force running - see TaskInstance.run()</span>
+<span class="sd">        :type force: bool</span>
+<span class="sd">        :param local: Whether to run the task locally</span>
+<span class="sd">        :type local: bool</span>
+<span class="sd">        :param pickle_id: If the DAG was serialized to the DB, the ID</span>
+<span class="sd">        associated with the pickled DAG</span>
+<span class="sd">        :type pickle_id: unicode</span>
+<span class="sd">        :param file_path: path to the file containing the DAG definition</span>
+<span class="sd">        :param raw: raw mode (needs more details)</span>
+<span class="sd">        :param job_id: job ID (needs more details)</span>
+<span class="sd">        :param pool: the Airflow pool that the task should run in</span>
+<span class="sd">        :type pool: unicode</span>
+<span class="sd">        :return: shell command that can be used to run the task instance</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">iso</span> <span class="o">=</span> <span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
+        <span class="n">cmd</span> <span class="o">=</span> <span class="s2">&quot;airflow run {dag_id} {task_id} {iso} &quot;</span>
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--mark_success &quot;</span> <span class="k">if</span> <span class="n">mark_success</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--pickle {pickle_id} &quot;</span> <span class="k">if</span> <span class="n">pickle_id</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--job_id {job_id} &quot;</span> <span class="k">if</span> <span class="n">job_id</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
@@ -902,11 +962,7 @@
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--local &quot;</span> <span class="k">if</span> <span class="n">local</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--pool {pool} &quot;</span> <span class="k">if</span> <span class="n">pool</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
         <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;--raw &quot;</span> <span class="k">if</span> <span class="n">raw</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
-        <span class="k">if</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span><span class="p">:</span>
-            <span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span>
-                <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;-sd DAGS_FOLDER/{dag.filepath} &quot;</span>
-            <span class="k">elif</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
-                <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;-sd {dag.full_filepath}&quot;</span>
+        <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">&quot;-sd {file_path}&quot;</span> <span class="k">if</span> <span class="n">file_path</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span>
         <span class="k">return</span> <span class="n">cmd</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">())</span></div>
 
     <span class="nd">@property</span>
@@ -1284,13 +1340,25 @@
             <span class="s2">&quot;{ti.execution_date} [{ti.state}]&gt;&quot;</span>
         <span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span>
 
+<div class="viewcode-block" id="TaskInstance.next_retry_datetime"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.next_retry_datetime">[docs]</a>    <span class="k">def</span> <span class="nf">next_retry_datetime</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Get datetime of the next retry if the task instance fails. For exponential</span>
+<span class="sd">        backoff, retry_delay is used as base and will be converted to seconds.</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">delay</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_exponential_backoff</span><span class="p">:</span>
+            <span class="n">delay_backoff_in_seconds</span> <span class="o">=</span> <span class="n">delay</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">**</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span>
+            <span class="n">delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">delay_backoff_in_seconds</span><span class="p">)</span>
+            <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">:</span>
+                <span class="n">delay</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">,</span> <span class="n">delay</span><span class="p">)</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">delay</span></div>
+
 <div class="viewcode-block" id="TaskInstance.ready_for_retry"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.ready_for_retry">[docs]</a>    <span class="k">def</span> <span class="nf">ready_for_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">        Checks on whether the task instance is in the right state and timeframe</span>
 <span class="sd">        to be retried.</span>
 <span class="sd">        &quot;&quot;&quot;</span>
-        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> \
-            <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">&lt;</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span></div>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span> <span class="o">&lt;</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span></div>
 
     <span class="nd">@provide_session</span>
 <div class="viewcode-block" id="TaskInstance.pool_full"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.pool_full">[docs]</a>    <span class="k">def</span> <span class="nf">pool_full</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
@@ -1308,9 +1376,7 @@
             <span class="o">.</span><span class="n">first</span><span class="p">()</span>
         <span class="p">)</span>
         <span class="k">if</span> <span class="ow">not</span> <span class="n">pool</span><span class="p">:</span>
-            <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
-                <span class="s2">&quot;Task specified a pool ({}) but the pool &quot;</span>
-                <span class="s2">&quot;doesn&#39;t exist!&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">))</span>
+            <span class="k">return</span> <span class="bp">False</span>
         <span class="n">open_slots</span> <span class="o">=</span> <span class="n">pool</span><span class="o">.</span><span class="n">open_slots</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
 
         <span class="k">return</span> <span class="n">open_slots</span> <span class="o">&lt;=</span> <span class="mi">0</span></div>
@@ -1378,7 +1444,7 @@
             <span class="c1"># todo: move this to the scheduler</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span>
                 <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">ready_for_retry</span><span class="p">()):</span>
-            <span class="n">next_run</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span><span class="p">)</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
+            <span class="n">next_run</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
             <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
                 <span class="s2">&quot;Not ready for retry yet. &quot;</span> <span class="o">+</span>
                 <span class="s2">&quot;Next run after {0}&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">next_run</span><span class="p">)</span>
@@ -1510,6 +1576,9 @@
         <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
             <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span>
 
+        <span class="c1"># Log failure duration</span>
+        <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskFail</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">))</span>
+
         <span class="c1"># Let&#39;s go deeper</span>
         <span class="k">try</span><span class="p">:</span>
             <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">%</span> <span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span> <span class="o">!=</span> <span class="mi">0</span><span class="p">:</span>
@@ -1592,17 +1661,25 @@
 <span class="sd">            {var.variable_name}.</span>
 <span class="sd">            &quot;&quot;&quot;</span>
             <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
-                <span class="k">pass</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="bp">None</span>
 
             <span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
-                <span class="k">return</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
+                <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
+
+            <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+                <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
 
         <span class="k">class</span> <span class="nc">VariableJsonAccessor</span><span class="p">:</span>
             <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
-                <span class="k">pass</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="bp">None</span>
 
             <span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
-                <span class="k">return</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span>  <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+                <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
+
+            <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+                <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
 
         <span class="k">return</span> <span class="p">{</span>
             <span class="s1">&#39;dag&#39;</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="p">,</span>
@@ -1755,6 +1832,29 @@
             <span class="k">return</span> <span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">task_ids</span><span class="p">)</span></div></div>
 
 
+<span class="k">class</span> <span class="nc">TaskFail</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    TaskFail tracks the failed run durations of each task instance.</span>
+<span class="sd">    &quot;&quot;&quot;</span>
+
+    <span class="n">__tablename__</span> <span class="o">=</span> <span class="s2">&quot;task_fail&quot;</span>
+
+    <span class="n">task_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+    <span class="n">dag_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+    <span class="n">execution_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">,</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+    <span class="n">start_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
+    <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
+    <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">=</span> <span class="n">execution_date</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span>
+
+
 <span class="k">class</span> <span class="nc">Log</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Used to actively log events to the database</span>
@@ -1831,6 +1931,12 @@
 <span class="sd">    :type retries: int</span>
 <span class="sd">    :param retry_delay: delay between retries</span>
 <span class="sd">    :type retry_delay: timedelta</span>
+<span class="sd">    :param retry_exponential_backoff: allow progressive longer waits between</span>
+<span class="sd">        retries by using exponential backoff algorithm on retry delay (delay</span>
+<span class="sd">        will be converted into seconds)</span>
+<span class="sd">    :type retry_exponential_backoff: bool</span>
+<span class="sd">    :param max_retry_delay: maximum delay interval between retries</span>
+<span class="sd">    :type max_retry_delay: timedelta</span>
 <span class="sd">    :param start_date: The ``start_date`` for the task, determines</span>
 <span class="sd">        the ``execution_date`` for the first task instance. The best practice</span>
 <span class="sd">        is to have the start_date rounded</span>
@@ -1908,6 +2014,9 @@
 <span class="sd">        using the constants defined in the static class</span>
 <span class="sd">        ``airflow.utils.TriggerRule``</span>
 <span class="sd">    :type trigger_rule: str</span>
+<span class="sd">    :param resources: A map of resource parameter names (the argument names of the</span>
+<span class="sd">        Resources constructor) to their values.</span>
+<span class="sd">    :type resources: dict</span>
 <span class="sd">    &quot;&quot;&quot;</span>
 
     <span class="c1"># For derived classes to define which fields will get jinjaified</span>
@@ -1928,6 +2037,8 @@
             <span class="n">email_on_failure</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
             <span class="n">retries</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
             <span class="n">retry_delay</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="mi">300</span><span class="p">),</span>
+            <span class="n">retry_exponential_backoff</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+            <span class="n">max_retry_delay</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">start_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">end_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">schedule_interval</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>  <span class="c1"># not hooked as of now</span>
@@ -1946,6 +2057,7 @@
             <span class="n">on_success_callback</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">on_retry_callback</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">trigger_rule</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">ALL_SUCCESS</span><span class="p">,</span>
+            <span class="n">resources</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="o">*</span><span class="n">args</span><span class="p">,</span>
             <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
 
@@ -2003,9 +2115,12 @@
         <span class="k">else</span><span class="p">:</span>
             <span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;retry_delay isn&#39;t timedelta object, assuming secs&quot;</span><span class="p">)</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">retry_delay</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">retry_exponential_backoff</span> <span class="o">=</span> <span class="n">retry_exponential_backoff</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">max_retry_delay</span> <span class="o">=</span> <span class="n">max_retry_delay</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="n">params</span> <span class="ow">or</span> <span class="p">{}</span>  <span class="c1"># Available in templates!</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">adhoc</span> <span class="o">=</span> <span class="n">adhoc</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">priority_weight</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">resources</span> <span class="o">=</span> <span class="n">Resources</span><span class="p">(</span><span class="o">**</span><span class="p">(</span><span class="n">resources</span> <span class="ow">or</span> <span class="p">{}))</span>
 
         <span class="c1"># Private attributes</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span> <span class="o">=</span> <span class="p">[]</span>
@@ -2023,6 +2138,8 @@
             <span class="s1">&#39;email&#39;</span><span class="p">,</span>
             <span class="s1">&#39;email_on_retry&#39;</span><span class="p">,</span>
             <span class="s1">&#39;retry_delay&#39;</span><span class="p">,</span>
+            <span class="s1">&#39;retry_exponential_backoff&#39;</span><span class="p">,</span>
+            <span class="s1">&#39;max_retry_delay&#39;</span><span class="p">,</span>
             <span class="s1">&#39;start_date&#39;</span><span class="p">,</span>
             <span class="s1">&#39;schedule_interval&#39;</span><span class="p">,</span>
             <span class="s1">&#39;depends_on_past&#39;</span><span class="p">,</span>
@@ -2042,7 +2159,7 @@
             <span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
                 <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">))</span>
 
-    <span class="k">def</span> <span class="nf">__neq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
+    <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
         <span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span>
 
     <span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
@@ -2541,8 +2658,8 @@
     <span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
     <span class="c1"># Last time this DAG was pickled</span>
     <span class="n">last_pickled</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
-    <span class="c1"># When the DAG received a refreshed signal last, used to know when</span>
-    <span class="c1"># we need to force refresh</span>
+    <span class="c1"># Time when the DAG last received a refresh signal</span>
+    <span class="c1"># (e.g. the DAG&#39;s &quot;refresh&quot; button was clicked in the web UI)</span>
     <span class="n">last_expired</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
     <span class="c1"># Whether (one  of) the scheduler is scheduling this DAG at the moment</span>
     <span class="n">scheduler_lock</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">)</span>
@@ -2567,7 +2684,7 @@
 
 
 <span class="nd">@functools.total_ordering</span>
-<div class="viewcode-block" id="DAG"><a class="viewcode-back" href="../../code.html#airflow.models.DAG">[docs]</a><span class="k">class</span> <span class="nc">DAG</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
+<div class="viewcode-block" id="DAG"><a class="viewcode-back" href="../../code.html#airflow.models.DAG">[docs]</a><span class="k">class</span> <span class="nc">DAG</span><span class="p">(</span><span class="n">BaseDag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    A dag (directed acyclic graph) is a collection of tasks with directional</span>
 <span class="sd">    dependencies. A dag also has a schedule, a start end an end date</span>
@@ -2655,8 +2772,14 @@
             <span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;params&#39;</span><span class="p">]</span>
 
         <span class="n">validate_key</span><span class="p">(</span><span class="n">dag_id</span><span class="p">)</span>
+
+        <span class="c1"># Properties from BaseDag</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">full_filepath</span> <span class="k">if</span> <span class="n">full_filepath</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">concurrency</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="bp">None</span>
+
         <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">dag_id</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
@@ -2666,14 +2789,12 @@
             <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="bp">None</span>
         <span class="k">else</span><span class="p">:</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">full_filepath</span> <span class="k">if</span> <span class="n">full_filepath</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span>
         <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">template_searchpath</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
             <span class="n">template_searchpath</span> <span class="o">=</span> <span class="p">[</span><span class="n">template_searchpath</span><span class="p">]</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">template_searchpath</span> <span class="o">=</span> <span class="n">template_searchpath</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="bp">None</span>  <span class="c1"># Gets set when DAGs are loaded</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">safe_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">,</span> <span class="s1">&#39;__dot__&#39;</span><span class="p">)</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">concurrency</span> <span class="o">=</span> <span class="n">concurrency</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">max_active_runs</span> <span class="o">=</span> <span class="n">max_active_runs</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dagrun_timeout</span> <span class="o">=</span> <span class="n">dagrun_timeout</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">sla_miss_callback</span> <span class="o">=</span> <span class="n">sla_miss_callback</span>
@@ -2695,10 +2816,12 @@
     <span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
         <span class="k">return</span> <span class="p">(</span>
             <span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> <span class="ow">and</span>
-            <span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
+            <span class="c1"># Use getattr() instead of __dict__ as __dict__ doesn&#39;t return</span>
+            <span class="c1"># correct values for properties.</span>
+            <span class="nb">all</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
                 <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">))</span>
 
-    <span class="k">def</span> <span class="nf">__neq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
+    <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
         <span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span>
 
     <span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
@@ -2770,6 +2893,38 @@
         <span class="k">return</span> <span class="n">dttm</span></div>
 
     <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span>
+
+    <span class="nd">@dag_id.setter</span>
+    <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">value</span>
+
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span>
+
+    <span class="nd">@full_filepath.setter</span>
+    <span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">value</span>
+
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span>
+
+    <span class="nd">@concurrency.setter</span>
+    <span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">value</span>
+
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span>
+
+    <span class="nd">@pickle_id.setter</span>
+    <span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="n">value</span>
+
+    <span class="nd">@property</span>
     <span class="k">def</span> <span class="nf">tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
 
@@ -2866,6 +3021,15 @@
                 <span class="n">l</span> <span class="o">+=</span> <span class="n">task</span><span class="o">.</span><span class="n">subdag</span><span class="o">.</span><span class="n">subdags</span>
         <span class="k">return</span> <span class="n">l</span>
 
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">reached_max_runs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="n">active_runs</span> <span class="o">=</span> <span class="n">DagRun</span><span class="o">.</span><span class="n">find</span><span class="p">(</span>
+            <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
+            <span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span>
+            <span class="n">external_trigger</span><span class="o">=</span><span class="bp">False</span>
+        <span class="p">)</span>
+        <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">active_runs</span><span class="p">)</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_active_runs</span>
+
     <span class="k">def</span> <span class="nf">resolve_template_files</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
             <span class="n">t</span><span class="o">.</span><span class="n">resolve_template_files</span><span class="p">()</span>
@@ -2932,9 +3096,9 @@
         <span class="n">dates</span> <span class="o">=</span> <span class="n">utils_date_range</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="p">)</span>
         <span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter_by</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
         <span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span>
-            <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span>
+            <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
 
-    <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span>
+<div class="viewcode-block" id="DAG.clear"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.clear">[docs]</a>    <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span>
             <span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
             <span class="n">only_failed</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
             <span class="n">only_running</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
@@ -2942,11 +3106,11 @@
             <span class="n">include_subdags</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
             <span class="n">reset_dag_runs</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
             <span class="n">dry_run</span><span class="o">=</span><span class="bp">False</span><span class="p">):</span>
-        <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
         <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">        Clears a set of task instances associated with the current dag for</span>
 <span class="sd">        a specified date range.</span>
 <span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
         <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
         <span class="n">tis</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span>
         <span class="k">if</span> <span class="n">include_subdags</span><span class="p">:</span>
@@ -2998,7 +3162,7 @@
 
         <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
         <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
-        <span class="k">return</span> <span class="n">count</span>
+        <span class="k">return</span> <span class="n">count</span></div>
 
     <span class="k">def</span> <span class="nf">__deepcopy__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">memo</span><span class="p">):</span>
         <span class="c1"># Swiwtcharoo to go around deepcopying objects coming through the</span>
@@ -3230,7 +3394,81 @@
         <span class="n">run</span><span class="o">.</span><span class="n">verify_integrity</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
 
         <span class="n">run</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span>
-        <span class="k">return</span> <span class="n">run</span></div></div>
+        <span class="k">return</span> <span class="n">run</span></div>
+
+    <span class="nd">@staticmethod</span>
+    <span class="nd">@provide_session</span>
+<div class="viewcode-block" id="DAG.sync_to_db"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.sync_to_db">[docs]</a>    <span class="k">def</span> <span class="nf">sync_to_db</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">owner</span><span class="p">,</span> <span class="n">sync_time</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Save attributes about this DAG to the DB. Note that this method</span>
+<span class="sd">        can be called for both DAGs and SubDAGs. A SubDag is actually a</span>
+<span class="sd">        SubDagOperator.</span>
+
+<span class="sd">        :param dag: the DAG object to save to the DB</span>
+<span class="sd">        :type dag: DAG</span>
+<span class="sd">        :own</span>
+<span class="sd">        :param sync_time: The time that the DAG should be marked as sync&#39;ed</span>
+<span class="sd">        :type sync_time: datetime</span>
+<span class="sd">        :return: None</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
+            <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
+        <span class="k">if</span> <span class="ow">not</span> <span class="n">orm_dag</span><span class="p">:</span>
+            <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
+            <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Creating ORM DAG for </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
+                         <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
+        <span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
+        <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span>
+        <span class="n">orm_dag</span><span class="o">.</span><span class="n">owners</span> <span class="o">=</span> <span class="n">owner</span>
+        <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="bp">True</span>
+        <span class="n">orm_dag</span><span class="o">.</span><span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">sync_time</span>
+        <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">orm_dag</span><span class="p">)</span>
+        <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
+
+        <span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span><span class="p">:</span>
+            <span class="n">DAG</span><span class="o">.</span><span class="n">sync_to_db</span><span class="p">(</span><span class="n">subdag</span><span class="p">,</span> <span class="n">owner</span><span class="p">,</span> <span class="n">sync_time</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div>
+
+    <span class="nd">@staticmethod</span>
+    <span class="nd">@provide_session</span>
+<div class="viewcode-block" id="DAG.deactivate_unknown_dags"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.deactivate_unknown_dags">[docs]</a>    <span class="k">def</span> <span class="nf">deactivate_unknown_dags</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Given a list of known DAGs, deactivate any other DAGs that are</span>
+<span class="sd">        marked as active in the ORM</span>
+
+<span class="sd">        :param active_dag_ids: list of DAG IDs that are active</span>
+<span class="sd">        :type active_dag_ids: list[unicode]</span>
+<span class="sd">        :return: None</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+
+        <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+            <span class="k">return</span>
+        <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
+                <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="o">~</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">))</span><span class="o">.</span><span

<TRUNCATED>