You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2016/08/11 22:59:45 UTC
[6/7] incubator-airflow-site git commit: New version of the docs
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/4af0850c/_modules/airflow/models.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/models.html b/_modules/airflow/models.html
index d4ed8a6..2f044a4 100644
--- a/_modules/airflow/models.html
+++ b/_modules/airflow/models.html
@@ -198,6 +198,7 @@
<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="kn">import</span> <span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span> <span class="n">LocalExecutor</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">configuration</span>
<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="kn">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSkipException</span>
+<span class="kn">from</span> <span class="nn">airflow.dag.base_dag</span> <span class="kn">import</span> <span class="n">BaseDag</span><span class="p">,</span> <span class="n">BaseDagBag</span>
<span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="kn">import</span> <span class="n">cron_presets</span><span class="p">,</span> <span class="n">date_range</span> <span class="k">as</span> <span class="n">utils_date_range</span>
<span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="kn">import</span> <span class="n">provide_session</span>
<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span>
@@ -205,6 +206,7 @@
<span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="kn">import</span> <span class="p">(</span>
<span class="n">as_tuple</span><span class="p">,</span> <span class="n">is_container</span><span class="p">,</span> <span class="n">is_in</span><span class="p">,</span> <span class="n">validate_key</span><span class="p">,</span> <span class="n">pprinttable</span><span class="p">)</span>
<span class="kn">from</span> <span class="nn">airflow.utils.logging</span> <span class="kn">import</span> <span class="n">LoggingMixin</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="kn">import</span> <span class="n">Resources</span>
<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="kn">import</span> <span class="n">State</span>
<span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="kn">import</span> <span class="n">timeout</span>
<span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="kn">import</span> <span class="n">TriggerRule</span>
@@ -267,7 +269,7 @@
<span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
-<div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
+<div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">BaseDagBag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> A dagbag is a collection of dags, parsed out of a folder tree and has high</span>
<span class="sd"> level configuration settings, like what database to use as a backend and</span>
@@ -278,7 +280,7 @@
<span class="sd"> independent settings sets.</span>
<span class="sd"> :param dag_folder: the folder to scan to find DAGs</span>
-<span class="sd"> :type dag_folder: str</span>
+<span class="sd"> :type dag_folder: unicode</span>
<span class="sd"> :param executor: the executor to use when executing task instances</span>
<span class="sd"> in this DagBag</span>
<span class="sd"> :param include_examples: whether to include the examples that ship</span>
@@ -293,25 +295,23 @@
<span class="bp">self</span><span class="p">,</span>
<span class="n">dag_folder</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">executor</span><span class="o">=</span><span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span>
- <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'LOAD_EXAMPLES'</span><span class="p">),</span>
- <span class="n">sync_to_db</span><span class="o">=</span><span class="bp">False</span><span class="p">):</span>
+ <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'LOAD_EXAMPLES'</span><span class="p">)):</span>
<span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="n">DAGS_FOLDER</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Filling up the DagBag from {}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="o">=</span> <span class="p">{}</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">sync_to_db</span> <span class="o">=</span> <span class="n">sync_to_db</span>
+ <span class="c1"># the file's last modified timestamp when we last read it</span>
<span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span> <span class="o">=</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">executor</span> <span class="o">=</span> <span class="n">executor</span>
<span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span> <span class="o">=</span> <span class="p">{}</span>
+
<span class="k">if</span> <span class="n">include_examples</span><span class="p">:</span>
<span class="n">example_dag_folder</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
<span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">__file__</span><span class="p">),</span>
<span class="s1">'example_dags'</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">example_dag_folder</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">)</span>
- <span class="k">if</span> <span class="n">sync_to_db</span><span class="p">:</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">deactivate_inactive_dags</span><span class="p">()</span>
<div class="viewcode-block" id="DagBag.size"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.size">[docs]</a> <span class="k">def</span> <span class="nf">size</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">"""</span>
@@ -330,7 +330,7 @@
<span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span><span class="p">:</span>
<span class="n">root_dag_id</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">parent_dag</span><span class="o">.</span><span class="n">dag_id</span>
- <span class="c1"># If the root_dag_id is absent or expired</span>
+ <span class="c1"># If the dag corresponding to root_dag_id is absent or expired</span>
<span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="o">.</span><span class="n">get_current</span><span class="p">(</span><span class="n">root_dag_id</span><span class="p">)</span>
<span class="k">if</span> <span class="n">orm_dag</span> <span class="ow">and</span> <span class="p">(</span>
<span class="n">root_dag_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="ow">or</span>
@@ -339,10 +339,11 @@
<span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o"><</span> <span class="n">orm_dag</span><span class="o">.</span><span class="n">last_expired</span>
<span class="p">)</span>
<span class="p">):</span>
- <span class="c1"># Reprocessing source file</span>
+ <span class="c1"># Reprocess source file</span>
<span class="n">found_dags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_file</span><span class="p">(</span>
<span class="n">filepath</span><span class="o">=</span><span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span><span class="p">,</span> <span class="n">only_if_updated</span><span class="o">=</span><span class="bp">False</span><span class="p">)</span>
+ <span class="c1"># If the source file no longer exports `dag_id`, delete it from self.dags</span>
<span class="k">if</span> <span class="n">found_dags</span> <span class="ow">and</span> <span class="n">dag_id</span> <span class="ow">in</span> <span class="p">[</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">found_dags</span><span class="p">]:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">[</span><span class="n">dag_id</span><span class="p">]</span>
<span class="k">elif</span> <span class="n">dag_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">:</span>
@@ -363,10 +364,10 @@
<span class="k">try</span><span class="p">:</span>
<span class="c1"># This failed before in what may have been a git sync</span>
<span class="c1"># race condition</span>
- <span class="n">dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromtimestamp</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">getmtime</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span>
+ <span class="n">file_last_changed_on_disk</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">fromtimestamp</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">getmtime</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span>
<span class="k">if</span> <span class="n">only_if_updated</span> \
<span class="ow">and</span> <span class="n">filepath</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span> \
- <span class="ow">and</span> <span class="n">dttm</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]:</span>
+ <span class="ow">and</span> <span class="n">file_last_changed_on_disk</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]:</span>
<span class="k">return</span> <span class="n">found_dags</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
@@ -395,7 +396,7 @@
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: "</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">zip_file</span> <span class="o">=</span> <span class="n">zipfile</span><span class="o">.</span><span class="n">ZipFile</span><span class="p">(</span><span class="n">filepath</span><span class="p">)</span>
@@ -426,7 +427,7 @@
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">"Failed to import: "</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
<span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">mods</span><span class="p">:</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
@@ -439,11 +440,11 @@
<span class="n">found_dags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">dag</span><span class="p">)</span>
<span class="n">found_dags</span> <span class="o">+=</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">dttm</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
<span class="k">return</span> <span class="n">found_dags</span></div>
<span class="nd">@provide_session</span>
-<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a> <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
+<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a> <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Fails tasks that haven't had a heartbeat in too long</span>
<span class="sd"> """</span>
@@ -477,6 +478,7 @@
<span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">"{} killed as zombie"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">'Marked zombie job {} as failed'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
+ <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'zombies_killed'</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<div class="viewcode-block" id="DagBag.bag_dag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.bag_dag">[docs]</a> <span class="k">def</span> <span class="nf">bag_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dag</span><span class="p">,</span> <span class="n">parent_dag</span><span class="p">,</span> <span class="n">root_dag</span><span class="p">):</span>
@@ -490,20 +492,6 @@
<span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
<span class="n">settings</span><span class="o">.</span><span class="n">policy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span>
- <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">sync_to_db</span><span class="p">:</span>
- <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
- <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
- <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
- <span class="k">if</span> <span class="ow">not</span> <span class="n">orm_dag</span><span class="p">:</span>
- <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
- <span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="n">root_dag</span><span class="o">.</span><span class="n">full_filepath</span>
- <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span>
- <span class="n">orm_dag</span><span class="o">.</span><span class="n">owners</span> <span class="o">=</span> <span class="n">root_dag</span><span class="o">.</span><span class="n">owner</span>
- <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="bp">True</span>
- <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">orm_dag</span><span class="p">)</span>
- <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
- <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
-
<span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span><span class="p">:</span>
<span class="n">subdag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
<span class="n">subdag</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="n">dag</span>
@@ -692,7 +680,8 @@
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
- <span class="s2">"Can't decrypt, configuration is missing"</span><span class="p">)</span>
+ <span class="s2">"Can't decrypt encrypted password for login={}, </span><span class="se">\</span>
+<span class="s2"> FERNET_KEY configuration is missing"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
<span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span>
@@ -715,7 +704,8 @@
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
- <span class="s2">"Can't decrypt `extra`, configuration is missing"</span><span class="p">)</span>
+ <span class="s2">"Can't decrypt `extra` params for login={},</span><span class="se">\</span>
+<span class="s2"> FERNET_KEY configuration is missing"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
<span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span>
@@ -891,8 +881,78 @@
<span class="sd"> the orchestrator.</span>
<span class="sd"> """</span>
<span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span>
- <span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
- <span class="n">cmd</span> <span class="o">=</span> <span class="s2">"airflow run {self.dag_id} {self.task_id} {iso} "</span>
+
+ <span class="n">should_pass_filepath</span> <span class="o">=</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span>
+ <span class="k">if</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span>
+ <span class="n">path</span> <span class="o">=</span> <span class="s2">"DAGS_FOLDER/{}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span>
+ <span class="k">elif</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
+ <span class="n">path</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
+ <span class="k">else</span><span class="p">:</span>
+ <span class="n">path</span> <span class="o">=</span> <span class="bp">None</span>
+
+ <span class="k">return</span> <span class="n">TaskInstance</span><span class="o">.</span><span class="n">generate_command</span><span class="p">(</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
+ <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
+ <span class="n">ignore_dependencies</span><span class="o">=</span><span class="n">ignore_dependencies</span><span class="p">,</span>
+ <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
+ <span class="n">force</span><span class="o">=</span><span class="n">force</span><span class="p">,</span>
+ <span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span>
+ <span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span>
+ <span class="n">file_path</span><span class="o">=</span><span class="n">path</span><span class="p">,</span>
+ <span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span>
+ <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
+ <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">)</span></div>
+
+ <span class="nd">@staticmethod</span>
+<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a> <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span>
+ <span class="n">task_id</span><span class="p">,</span>
+ <span class="n">execution_date</span><span class="p">,</span>
+ <span class="n">mark_success</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">ignore_dependencies</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">force</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">local</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">pickle_id</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+ <span class="n">file_path</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+ <span class="n">raw</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">job_id</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
+ <span class="n">pool</span><span class="o">=</span><span class="bp">None</span>
+ <span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Generates the shell command required to execute this task instance.</span>
+
+<span class="sd"> :param dag_id: DAG ID</span>
+<span class="sd"> :type dag_id: unicode</span>
+<span class="sd"> :param task_id: Task ID</span>
+<span class="sd"> :type task_id: unicode</span>
+<span class="sd"> :param execution_date: Execution date for the task</span>
+<span class="sd"> :type execution_date: datetime</span>
+<span class="sd"> :param mark_success: Whether to mark the task as successful</span>
+<span class="sd"> :type mark_success: bool</span>
+<span class="sd"> :param ignore_dependencies: Whether to ignore the dependencies and run</span>
+<span class="sd"> anyway</span>
+<span class="sd"> :type ignore_dependencies: bool</span>
+<span class="sd"> :param ignore_depends_on_past: Whether to ignore the depends on past</span>
+<span class="sd"> setting and run anyway</span>
+<span class="sd"> :type ignore_depends_on_past: bool</span>
+<span class="sd"> :param force: Whether to force running - see TaskInstance.run()</span>
+<span class="sd"> :type force: bool</span>
+<span class="sd"> :param local: Whether to run the task locally</span>
+<span class="sd"> :type local: bool</span>
+<span class="sd"> :param pickle_id: If the DAG was serialized to the DB, the ID</span>
+<span class="sd"> associated with the pickled DAG</span>
+<span class="sd"> :type pickle_id: unicode</span>
+<span class="sd"> :param file_path: path to the file containing the DAG definition</span>
+<span class="sd"> :param raw: raw mode (needs more details)</span>
+<span class="sd"> :param job_id: job ID (needs more details)</span>
+<span class="sd"> :param pool: the Airflow pool that the task should run in</span>
+<span class="sd"> :type pool: unicode</span>
+<span class="sd"> :return: shell command that can be used to run the task instance</span>
+<span class="sd"> """</span>
+ <span class="n">iso</span> <span class="o">=</span> <span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
+ <span class="n">cmd</span> <span class="o">=</span> <span class="s2">"airflow run {dag_id} {task_id} {iso} "</span>
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--mark_success "</span> <span class="k">if</span> <span class="n">mark_success</span> <span class="k">else</span> <span class="s2">""</span>
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--pickle {pickle_id} "</span> <span class="k">if</span> <span class="n">pickle_id</span> <span class="k">else</span> <span class="s2">""</span>
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--job_id {job_id} "</span> <span class="k">if</span> <span class="n">job_id</span> <span class="k">else</span> <span class="s2">""</span>
@@ -902,11 +962,7 @@
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--local "</span> <span class="k">if</span> <span class="n">local</span> <span class="k">else</span> <span class="s2">""</span>
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--pool {pool} "</span> <span class="k">if</span> <span class="n">pool</span> <span class="k">else</span> <span class="s2">""</span>
<span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"--raw "</span> <span class="k">if</span> <span class="n">raw</span> <span class="k">else</span> <span class="s2">""</span>
- <span class="k">if</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span><span class="p">:</span>
- <span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span>
- <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"-sd DAGS_FOLDER/{dag.filepath} "</span>
- <span class="k">elif</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
- <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"-sd {dag.full_filepath}"</span>
+ <span class="n">cmd</span> <span class="o">+=</span> <span class="s2">"-sd {file_path}"</span> <span class="k">if</span> <span class="n">file_path</span> <span class="k">else</span> <span class="s2">""</span>
<span class="k">return</span> <span class="n">cmd</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">())</span></div>
<span class="nd">@property</span>
@@ -1284,13 +1340,25 @@
<span class="s2">"{ti.execution_date} [{ti.state}]>"</span>
<span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span>
+<div class="viewcode-block" id="TaskInstance.next_retry_datetime"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.next_retry_datetime">[docs]</a> <span class="k">def</span> <span class="nf">next_retry_datetime</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Get datetime of the next retry if the task instance fails. For exponential</span>
+<span class="sd"> backoff, retry_delay is used as base and will be converted to seconds.</span>
+<span class="sd"> """</span>
+ <span class="n">delay</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_exponential_backoff</span><span class="p">:</span>
+ <span class="n">delay_backoff_in_seconds</span> <span class="o">=</span> <span class="n">delay</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">**</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span>
+ <span class="n">delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">delay_backoff_in_seconds</span><span class="p">)</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">:</span>
+ <span class="n">delay</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">,</span> <span class="n">delay</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">delay</span></div>
+
<div class="viewcode-block" id="TaskInstance.ready_for_retry"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.ready_for_retry">[docs]</a> <span class="k">def</span> <span class="nf">ready_for_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Checks on whether the task instance is in the right state and timeframe</span>
<span class="sd"> to be retried.</span>
<span class="sd"> """</span>
- <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> \
- <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o"><</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span></div>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span> <span class="o"><</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.pool_full"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.pool_full">[docs]</a> <span class="k">def</span> <span class="nf">pool_full</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
@@ -1308,9 +1376,7 @@
<span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">pool</span><span class="p">:</span>
- <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
- <span class="s2">"Task specified a pool ({}) but the pool "</span>
- <span class="s2">"doesn't exist!"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">))</span>
+ <span class="k">return</span> <span class="bp">False</span>
<span class="n">open_slots</span> <span class="o">=</span> <span class="n">pool</span><span class="o">.</span><span class="n">open_slots</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="k">return</span> <span class="n">open_slots</span> <span class="o"><=</span> <span class="mi">0</span></div>
@@ -1378,7 +1444,7 @@
<span class="c1"># todo: move this to the scheduler</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span>
<span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">ready_for_retry</span><span class="p">()):</span>
- <span class="n">next_run</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span><span class="p">)</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
+ <span class="n">next_run</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">"Not ready for retry yet. "</span> <span class="o">+</span>
<span class="s2">"Next run after {0}"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">next_run</span><span class="p">)</span>
@@ -1510,6 +1576,9 @@
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span>
+ <span class="c1"># Log failure duration</span>
+ <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskFail</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">))</span>
+
<span class="c1"># Let's go deeper</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">%</span> <span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span> <span class="o">!=</span> <span class="mi">0</span><span class="p">:</span>
@@ -1592,17 +1661,25 @@
<span class="sd"> {var.variable_name}.</span>
<span class="sd"> """</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
- <span class="k">pass</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="bp">None</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
- <span class="k">return</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
+
+ <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">VariableJsonAccessor</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
- <span class="k">pass</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="bp">None</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
- <span class="k">return</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
+
+ <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">'dag'</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="p">,</span>
@@ -1755,6 +1832,29 @@
<span class="k">return</span> <span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">task_ids</span><span class="p">)</span></div></div>
+<span class="k">class</span> <span class="nc">TaskFail</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> TaskFail tracks the failed run durations of each task instance.</span>
+<span class="sd"> """</span>
+
+ <span class="n">__tablename__</span> <span class="o">=</span> <span class="s2">"task_fail"</span>
+
+ <span class="n">task_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+ <span class="n">dag_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+ <span class="n">execution_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">,</span> <span class="n">primary_key</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span>
+ <span class="n">start_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
+ <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
+ <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">=</span> <span class="n">execution_date</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span>
+
+
<span class="k">class</span> <span class="nc">Log</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Used to actively log events to the database</span>
@@ -1831,6 +1931,12 @@
<span class="sd"> :type retries: int</span>
<span class="sd"> :param retry_delay: delay between retries</span>
<span class="sd"> :type retry_delay: timedelta</span>
+<span class="sd"> :param retry_exponential_backoff: allow progressive longer waits between</span>
+<span class="sd"> retries by using exponential backoff algorithm on retry delay (delay</span>
+<span class="sd"> will be converted into seconds)</span>
+<span class="sd"> :type retry_exponential_backoff: bool</span>
+<span class="sd"> :param max_retry_delay: maximum delay interval between retries</span>
+<span class="sd"> :type max_retry_delay: timedelta</span>
<span class="sd"> :param start_date: The ``start_date`` for the task, determines</span>
<span class="sd"> the ``execution_date`` for the first task instance. The best practice</span>
<span class="sd"> is to have the start_date rounded</span>
@@ -1908,6 +2014,9 @@
<span class="sd"> using the constants defined in the static class</span>
<span class="sd"> ``airflow.utils.TriggerRule``</span>
<span class="sd"> :type trigger_rule: str</span>
+<span class="sd"> :param resources: A map of resource parameter names (the argument names of the</span>
+<span class="sd"> Resources constructor) to their values.</span>
+<span class="sd"> :type resources: dict</span>
<span class="sd"> """</span>
<span class="c1"># For derived classes to define which fields will get jinjaified</span>
@@ -1928,6 +2037,8 @@
<span class="n">email_on_failure</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
<span class="n">retries</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span>
<span class="n">retry_delay</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="mi">300</span><span class="p">),</span>
+ <span class="n">retry_exponential_backoff</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
+ <span class="n">max_retry_delay</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">schedule_interval</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="c1"># not hooked as of now</span>
@@ -1946,6 +2057,7 @@
<span class="n">on_success_callback</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">on_retry_callback</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">trigger_rule</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">ALL_SUCCESS</span><span class="p">,</span>
+ <span class="n">resources</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
@@ -2003,9 +2115,12 @@
<span class="k">else</span><span class="p">:</span>
<span class="n">logging</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"retry_delay isn't timedelta object, assuming secs"</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">retry_delay</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">retry_exponential_backoff</span> <span class="o">=</span> <span class="n">retry_exponential_backoff</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">max_retry_delay</span> <span class="o">=</span> <span class="n">max_retry_delay</span>
<span class="bp">self</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="n">params</span> <span class="ow">or</span> <span class="p">{}</span> <span class="c1"># Available in templates!</span>
<span class="bp">self</span><span class="o">.</span><span class="n">adhoc</span> <span class="o">=</span> <span class="n">adhoc</span>
<span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">priority_weight</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">resources</span> <span class="o">=</span> <span class="n">Resources</span><span class="p">(</span><span class="o">**</span><span class="p">(</span><span class="n">resources</span> <span class="ow">or</span> <span class="p">{}))</span>
<span class="c1"># Private attributes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span> <span class="o">=</span> <span class="p">[]</span>
@@ -2023,6 +2138,8 @@
<span class="s1">'email'</span><span class="p">,</span>
<span class="s1">'email_on_retry'</span><span class="p">,</span>
<span class="s1">'retry_delay'</span><span class="p">,</span>
+ <span class="s1">'retry_exponential_backoff'</span><span class="p">,</span>
+ <span class="s1">'max_retry_delay'</span><span class="p">,</span>
<span class="s1">'start_date'</span><span class="p">,</span>
<span class="s1">'schedule_interval'</span><span class="p">,</span>
<span class="s1">'depends_on_past'</span><span class="p">,</span>
@@ -2042,7 +2159,7 @@
<span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
<span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">))</span>
- <span class="k">def</span> <span class="nf">__neq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
+ <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span>
<span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
@@ -2541,8 +2658,8 @@
<span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
<span class="c1"># Last time this DAG was pickled</span>
<span class="n">last_pickled</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
- <span class="c1"># When the DAG received a refreshed signal last, used to know when</span>
- <span class="c1"># we need to force refresh</span>
+ <span class="c1"># Time when the DAG last received a refresh signal</span>
+ <span class="c1"># (e.g. the DAG's "refresh" button was clicked in the web UI)</span>
<span class="n">last_expired</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
<span class="c1"># Whether (one of) the scheduler is scheduling this DAG at the moment</span>
<span class="n">scheduler_lock</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">)</span>
@@ -2567,7 +2684,7 @@
<span class="nd">@functools.total_ordering</span>
-<div class="viewcode-block" id="DAG"><a class="viewcode-back" href="../../code.html#airflow.models.DAG">[docs]</a><span class="k">class</span> <span class="nc">DAG</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
+<div class="viewcode-block" id="DAG"><a class="viewcode-back" href="../../code.html#airflow.models.DAG">[docs]</a><span class="k">class</span> <span class="nc">DAG</span><span class="p">(</span><span class="n">BaseDag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> A dag (directed acyclic graph) is a collection of tasks with directional</span>
<span class="sd"> dependencies. A dag also has a schedule, a start end an end date</span>
@@ -2655,8 +2772,14 @@
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">'params'</span><span class="p">]</span>
<span class="n">validate_key</span><span class="p">(</span><span class="n">dag_id</span><span class="p">)</span>
+
+ <span class="c1"># Properties from BaseDag</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">full_filepath</span> <span class="k">if</span> <span class="n">full_filepath</span> <span class="k">else</span> <span class="s1">''</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">concurrency</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="bp">None</span>
+
<span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">dag_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
@@ -2666,14 +2789,12 @@
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="bp">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">full_filepath</span> <span class="k">if</span> <span class="n">full_filepath</span> <span class="k">else</span> <span class="s1">''</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">template_searchpath</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="n">template_searchpath</span> <span class="o">=</span> <span class="p">[</span><span class="n">template_searchpath</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">template_searchpath</span> <span class="o">=</span> <span class="n">template_searchpath</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="bp">None</span> <span class="c1"># Gets set when DAGs are loaded</span>
<span class="bp">self</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">safe_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'.'</span><span class="p">,</span> <span class="s1">'__dot__'</span><span class="p">)</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">concurrency</span> <span class="o">=</span> <span class="n">concurrency</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_active_runs</span> <span class="o">=</span> <span class="n">max_active_runs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dagrun_timeout</span> <span class="o">=</span> <span class="n">dagrun_timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sla_miss_callback</span> <span class="o">=</span> <span class="n">sla_miss_callback</span>
@@ -2695,10 +2816,12 @@
<span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> <span class="ow">and</span>
- <span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
+ <span class="c1"># Use getattr() instead of __dict__ as __dict__ doesn't return</span>
+ <span class="c1"># correct values for properties.</span>
+ <span class="nb">all</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> <span class="o">==</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span>
<span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">))</span>
- <span class="k">def</span> <span class="nf">__neq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
+ <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span>
<span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
@@ -2770,6 +2893,38 @@
<span class="k">return</span> <span class="n">dttm</span></div>
<span class="nd">@property</span>
+ <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span>
+
+ <span class="nd">@dag_id.setter</span>
+ <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">value</span>
+
+ <span class="nd">@property</span>
+ <span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span>
+
+ <span class="nd">@full_filepath.setter</span>
+ <span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">value</span>
+
+ <span class="nd">@property</span>
+ <span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span>
+
+ <span class="nd">@concurrency.setter</span>
+ <span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">value</span>
+
+ <span class="nd">@property</span>
+ <span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span>
+
+ <span class="nd">@pickle_id.setter</span>
+ <span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="n">value</span>
+
+ <span class="nd">@property</span>
<span class="k">def</span> <span class="nf">tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">values</span><span class="p">())</span>
@@ -2866,6 +3021,15 @@
<span class="n">l</span> <span class="o">+=</span> <span class="n">task</span><span class="o">.</span><span class="n">subdag</span><span class="o">.</span><span class="n">subdags</span>
<span class="k">return</span> <span class="n">l</span>
+ <span class="nd">@property</span>
+ <span class="k">def</span> <span class="nf">reached_max_runs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="n">active_runs</span> <span class="o">=</span> <span class="n">DagRun</span><span class="o">.</span><span class="n">find</span><span class="p">(</span>
+ <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
+ <span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span>
+ <span class="n">external_trigger</span><span class="o">=</span><span class="bp">False</span>
+ <span class="p">)</span>
+ <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">active_runs</span><span class="p">)</span> <span class="o">>=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_active_runs</span>
+
<span class="k">def</span> <span class="nf">resolve_template_files</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
<span class="n">t</span><span class="o">.</span><span class="n">resolve_template_files</span><span class="p">()</span>
@@ -2932,9 +3096,9 @@
<span class="n">dates</span> <span class="o">=</span> <span class="n">utils_date_range</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="p">)</span>
<span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter_by</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span>
- <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span>
+ <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
- <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span>
+<div class="viewcode-block" id="DAG.clear"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
@@ -2942,11 +3106,11 @@
<span class="n">include_subdags</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span>
<span class="n">dry_run</span><span class="o">=</span><span class="bp">False</span><span class="p">):</span>
- <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
<span class="sd">"""</span>
<span class="sd"> Clears a set of task instances associated with the current dag for</span>
<span class="sd"> a specified date range.</span>
<span class="sd"> """</span>
+ <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span>
<span class="k">if</span> <span class="n">include_subdags</span><span class="p">:</span>
@@ -2998,7 +3162,7 @@
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
- <span class="k">return</span> <span class="n">count</span>
+ <span class="k">return</span> <span class="n">count</span></div>
<span class="k">def</span> <span class="nf">__deepcopy__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">memo</span><span class="p">):</span>
<span class="c1"># Swiwtcharoo to go around deepcopying objects coming through the</span>
@@ -3230,7 +3394,81 @@
<span class="n">run</span><span class="o">.</span><span class="n">verify_integrity</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="n">run</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span>
- <span class="k">return</span> <span class="n">run</span></div></div>
+ <span class="k">return</span> <span class="n">run</span></div>
+
+ <span class="nd">@staticmethod</span>
+ <span class="nd">@provide_session</span>
+<div class="viewcode-block" id="DAG.sync_to_db"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.sync_to_db">[docs]</a> <span class="k">def</span> <span class="nf">sync_to_db</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">owner</span><span class="p">,</span> <span class="n">sync_time</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Save attributes about this DAG to the DB. Note that this method</span>
+<span class="sd"> can be called for both DAGs and SubDAGs. A SubDag is actually a</span>
+<span class="sd"> SubDagOperator.</span>
+
+<span class="sd"> :param dag: the DAG object to save to the DB</span>
+<span class="sd"> :type dag: DAG</span>
+<span class="sd"> :own</span>
+<span class="sd"> :param sync_time: The time that the DAG should be marked as sync'ed</span>
+<span class="sd"> :type sync_time: datetime</span>
+<span class="sd"> :return: None</span>
+<span class="sd"> """</span>
+ <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
+ <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
+ <span class="k">if</span> <span class="ow">not</span> <span class="n">orm_dag</span><span class="p">:</span>
+ <span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
+ <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Creating ORM DAG for </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span>
+ <span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
+ <span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
+ <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">is_subdag</span>
+ <span class="n">orm_dag</span><span class="o">.</span><span class="n">owners</span> <span class="o">=</span> <span class="n">owner</span>
+ <span class="n">orm_dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="bp">True</span>
+ <span class="n">orm_dag</span><span class="o">.</span><span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">sync_time</span>
+ <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">orm_dag</span><span class="p">)</span>
+ <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
+
+ <span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">subdags</span><span class="p">:</span>
+ <span class="n">DAG</span><span class="o">.</span><span class="n">sync_to_db</span><span class="p">(</span><span class="n">subdag</span><span class="p">,</span> <span class="n">owner</span><span class="p">,</span> <span class="n">sync_time</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div>
+
+ <span class="nd">@staticmethod</span>
+ <span class="nd">@provide_session</span>
+<div class="viewcode-block" id="DAG.deactivate_unknown_dags"><a class="viewcode-back" href="../../code.html#airflow.models.DAG.deactivate_unknown_dags">[docs]</a> <span class="k">def</span> <span class="nf">deactivate_unknown_dags</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Given a list of known DAGs, deactivate any other DAGs that are</span>
+<span class="sd"> marked as active in the ORM</span>
+
+<span class="sd"> :param active_dag_ids: list of DAG IDs that are active</span>
+<span class="sd"> :type active_dag_ids: list[unicode]</span>
+<span class="sd"> :return: None</span>
+<span class="sd"> """</span>
+
+ <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+ <span class="k">return</span>
+ <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
+ <span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="o">~</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">))</span><span class="o">.</span><span
<TRUNCATED>