You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/03 17:48:23 UTC

[23/35] incubator-airflow-site git commit: 1.9.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/airflow/models.html
----------------------------------------------------------------------
diff --git a/_modules/airflow/models.html b/_modules/airflow/models.html
index 0b043ea..485f9ae 100644
--- a/_modules/airflow/models.html
+++ b/_modules/airflow/models.html
@@ -13,6 +13,8 @@
 
   
   
+  
+  
 
   
 
@@ -80,7 +82,10 @@
           
             
             
-                <ul>
+              
+            
+            
+              <ul>
 <li class="toctree-l1"><a class="reference internal" href="../../project.html">Project</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../license.html">License</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../../start.html">Quick Start</a></li>
@@ -175,6 +180,7 @@
 <span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
 <span class="c1"># See the License for the specific language governing permissions and</span>
 <span class="c1"># limitations under the License.</span>
+
 <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">absolute_import</span>
 <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">division</span>
 <span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span>
@@ -193,7 +199,7 @@
 <span class="kn">import</span> <span class="nn">getpass</span>
 <span class="kn">import</span> <span class="nn">imp</span>
 <span class="kn">import</span> <span class="nn">importlib</span>
-<span class="kn">import</span> <span class="nn">inspect</span>
+<span class="kn">import</span> <span class="nn">itertools</span>
 <span class="kn">import</span> <span class="nn">zipfile</span>
 <span class="kn">import</span> <span class="nn">jinja2</span>
 <span class="kn">import</span> <span class="nn">json</span>
@@ -208,12 +214,11 @@
 <span class="kn">import</span> <span class="nn">traceback</span>
 <span class="kn">import</span> <span class="nn">warnings</span>
 <span class="kn">import</span> <span class="nn">hashlib</span>
-
 <span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">urlparse</span>
 
 <span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="p">(</span>
     <span class="n">Column</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">String</span><span class="p">,</span> <span class="n">DateTime</span><span class="p">,</span> <span class="n">Text</span><span class="p">,</span> <span class="n">Boolean</span><span class="p">,</span> <span class="n">ForeignKey</span><span class="p">,</span> <span class="n">PickleType</span><span class="p">,</span>
-    <span class="n">Index</span><span class="p">,</span> <span class="n">Float</span><span class="p">)</span>
+    <span class="n">Index</span><span class="p">,</span> <span class="n">Float</span><span class="p">,</span> <span class="n">LargeBinary</span><span class="p">)</span>
 <span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="n">func</span><span class="p">,</span> <span class="n">or_</span><span class="p">,</span> <span class="n">and_</span>
 <span class="kn">from</span> <span class="nn">sqlalchemy.ext.declarative</span> <span class="k">import</span> <span class="n">declarative_base</span><span class="p">,</span> <span class="n">declared_attr</span>
 <span class="kn">from</span> <span class="nn">sqlalchemy.dialects.mysql</span> <span class="k">import</span> <span class="n">LONGTEXT</span>
@@ -223,13 +228,15 @@
 <span class="kn">import</span> <span class="nn">six</span>
 
 <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span><span class="p">,</span> <span class="n">utils</span>
-<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span> <span class="n">LocalExecutor</span>
+<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">GetDefaultExecutor</span><span class="p">,</span> <span class="n">LocalExecutor</span>
 <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span>
 <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowSkipException</span><span class="p">,</span> <span class="n">AirflowTaskTimeout</span>
 <span class="kn">from</span> <span class="nn">airflow.dag.base_dag</span> <span class="k">import</span> <span class="n">BaseDag</span><span class="p">,</span> <span class="n">BaseDagBag</span>
 <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.not_in_retry_period_dep</span> <span class="k">import</span> <span class="n">NotInRetryPeriodDep</span>
 <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.prev_dagrun_dep</span> <span class="k">import</span> <span class="n">PrevDagrunDep</span>
 <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.trigger_rule_dep</span> <span class="k">import</span> <span class="n">TriggerRuleDep</span>
+<span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.task_concurrency_dep</span> <span class="k">import</span> <span class="n">TaskConcurrencyDep</span>
+
 <span class="kn">from</span> <span class="nn">airflow.ti_deps.dep_context</span> <span class="k">import</span> <span class="n">DepContext</span><span class="p">,</span> <span class="n">QUEUE_DEPS</span><span class="p">,</span> <span class="n">RUN_DEPS</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="k">import</span> <span class="n">cron_presets</span><span class="p">,</span> <span class="n">date_range</span> <span class="k">as</span> <span class="n">utils_date_range</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span>
@@ -237,11 +244,11 @@
 <span class="kn">from</span> <span class="nn">airflow.utils.email</span> <span class="k">import</span> <span class="n">send_email</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="p">(</span>
     <span class="n">as_tuple</span><span class="p">,</span> <span class="n">is_container</span><span class="p">,</span> <span class="n">is_in</span><span class="p">,</span> <span class="n">validate_key</span><span class="p">,</span> <span class="n">pprinttable</span><span class="p">)</span>
-<span class="kn">from</span> <span class="nn">airflow.utils.logging</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="k">import</span> <span class="n">Resources</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="k">import</span> <span class="n">timeout</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="k">import</span> <span class="n">TriggerRule</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
 
 <span class="n">Base</span> <span class="o">=</span> <span class="n">declarative_base</span><span class="p">()</span>
 <span class="n">ID_LEN</span> <span class="o">=</span> <span class="mi">250</span>
@@ -249,24 +256,36 @@
 
 <span class="n">Stats</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Stats</span>
 
-<span class="n">ENCRYPTION_ON</span> <span class="o">=</span> <span class="kc">False</span>
-<span class="k">try</span><span class="p">:</span>
-    <span class="kn">from</span> <span class="nn">cryptography.fernet</span> <span class="k">import</span> <span class="n">Fernet</span>
-    <span class="n">FERNET</span> <span class="o">=</span> <span class="n">Fernet</span><span class="p">(</span><span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;FERNET_KEY&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span>
-    <span class="n">ENCRYPTION_ON</span> <span class="o">=</span> <span class="kc">True</span>
-<span class="k">except</span><span class="p">:</span>
-    <span class="k">pass</span>
+<span class="k">def</span> <span class="nf">get_fernet</span><span class="p">():</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Deferred load of Fernet key.</span>
+
+<span class="sd">    This function could fail either because Cryptography is not installed</span>
+<span class="sd">    or because the Fernet key is invalid.</span>
+
+<span class="sd">    :return: Fernet object</span>
+<span class="sd">    :raises: AirflowException if there&#39;s a problem trying to load Fernet</span>
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">try</span><span class="p">:</span>
+        <span class="kn">from</span> <span class="nn">cryptography.fernet</span> <span class="k">import</span> <span class="n">Fernet</span>
+    <span class="k">except</span><span class="p">:</span>
+        <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">&#39;Failed to import Fernet, it may not be installed&#39;</span><span class="p">)</span>
+    <span class="k">try</span><span class="p">:</span>
+        <span class="k">return</span> <span class="n">Fernet</span><span class="p">(</span><span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;FERNET_KEY&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span>
+    <span class="k">except</span> <span class="ne">ValueError</span> <span class="k">as</span> <span class="n">ve</span><span class="p">:</span>
+        <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;Could not create Fernet object: </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ve</span><span class="p">))</span>
+
 
 <span class="k">if</span> <span class="s1">&#39;mysql&#39;</span> <span class="ow">in</span> <span class="n">settings</span><span class="o">.</span><span class="n">SQL_ALCHEMY_CONN</span><span class="p">:</span>
     <span class="n">LongText</span> <span class="o">=</span> <span class="n">LONGTEXT</span>
 <span class="k">else</span><span class="p">:</span>
     <span class="n">LongText</span> <span class="o">=</span> <span class="n">Text</span>
 
-<span class="c1"># used by DAG context_managers</span>
+<span class="c1"># Used by DAG context_managers</span>
 <span class="n">_CONTEXT_MANAGER_DAG</span> <span class="o">=</span> <span class="kc">None</span>
 
 
-<span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span> <span class="n">session</span><span class="p">,</span> <span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
+<span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span> <span class="n">session</span><span class="p">,</span> <span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Clears a set of task instances, but makes sure the running ones</span>
 <span class="sd">    get killed.</span>
@@ -277,26 +296,34 @@
             <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">:</span>
                 <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span>
                 <span class="n">job_ids</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">)</span>
-        <span class="c1"># todo: this creates an issue with the webui tests</span>
-        <span class="c1"># elif ti.state != State.REMOVED:</span>
-        <span class="c1">#     ti.state = State.NONE</span>
-        <span class="c1">#     session.merge(ti)</span>
         <span class="k">else</span><span class="p">:</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">delete</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+            <span class="n">task_id</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span>
+            <span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">has_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">):</span>
+                <span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span>
+                <span class="n">task_retries</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span>
+                <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">+</span> <span class="n">task_retries</span> <span class="o">-</span> <span class="mi">1</span>
+            <span class="k">else</span><span class="p">:</span>
+                <span class="c1"># Ignore errors when updating max_tries if dag is None or</span>
+                <span class="c1"># task not found in dag since database records could be</span>
+                <span class="c1"># outdated. We make max_tries the maximum value of its</span>
+                <span class="c1"># original max_tries or the current task try number.</span>
+                <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span><span class="p">,</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">)</span>
+            <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span>
+            <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+
     <span class="k">if</span> <span class="n">job_ids</span><span class="p">:</span>
         <span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">BaseJob</span> <span class="k">as</span> <span class="n">BJ</span>
         <span class="k">for</span> <span class="n">job</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">BJ</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">BJ</span><span class="o">.</span><span class="n">id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">job_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">():</span>
             <span class="n">job</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span>
-    <span class="k">if</span> <span class="n">activate_dag_runs</span><span class="p">:</span>
-        <span class="n">execution_dates</span> <span class="o">=</span> <span class="p">{</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}</span>
-        <span class="n">dag_ids</span> <span class="o">=</span> <span class="p">{</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}</span>
+
+    <span class="k">if</span> <span class="n">activate_dag_runs</span> <span class="ow">and</span> <span class="n">tis</span><span class="p">:</span>
         <span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
-            <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">dag_ids</span><span class="p">),</span>
-            <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">execution_dates</span><span class="p">),</span>
+            <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span>
+            <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span>
         <span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
         <span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span>
             <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span>
-            <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
+            <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
 
 
 <div class="viewcode-block" id="DagBag"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag">[docs]</a><span class="k">class</span> <span class="nc">DagBag</span><span class="p">(</span><span class="n">BaseDagBag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
@@ -316,19 +343,19 @@
 <span class="sd">    :param include_examples: whether to include the examples that ship</span>
 <span class="sd">        with airflow or not</span>
 <span class="sd">    :type include_examples: bool</span>
-<span class="sd">    :param sync_to_db: whether to sync the properties of the DAGs to</span>
-<span class="sd">        the metadata DB while finding them, typically should be done</span>
-<span class="sd">        by the scheduler job only</span>
-<span class="sd">    :type sync_to_db: bool</span>
 <span class="sd">    &quot;&quot;&quot;</span>
+
     <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
             <span class="bp">self</span><span class="p">,</span>
             <span class="n">dag_folder</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
-            <span class="n">executor</span><span class="o">=</span><span class="n">DEFAULT_EXECUTOR</span><span class="p">,</span>
+            <span class="n">executor</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="n">include_examples</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;LOAD_EXAMPLES&#39;</span><span class="p">)):</span>
 
+        <span class="c1"># do not use default arg in signature, to fix import cycle on plugin load</span>
+        <span class="k">if</span> <span class="n">executor</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
+            <span class="n">executor</span> <span class="o">=</span> <span class="n">GetDefaultExecutor</span><span class="p">()</span>
         <span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="n">settings</span><span class="o">.</span><span class="n">DAGS_FOLDER</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Filling up the DagBag from </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">))</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Filling up the DagBag from </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">dag_folder</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dags</span> <span class="o">=</span> <span class="p">{}</span>
         <span class="c1"># the file&#39;s last modified timestamp when we last read it</span>
@@ -338,7 +365,7 @@
 
         <span class="k">if</span> <span class="n">include_examples</span><span class="p">:</span>
             <span class="n">example_dag_folder</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span>
-                <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="n">__file__</span><span class="p">),</span>
+                <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="vm">__file__</span><span class="p">),</span>
                 <span class="s1">&#39;example_dags&#39;</span><span class="p">)</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">example_dag_folder</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">collect_dags</span><span class="p">(</span><span class="n">dag_folder</span><span class="p">)</span>
@@ -401,7 +428,7 @@
                 <span class="k">return</span> <span class="n">found_dags</span>
 
         <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
-            <span class="n">logging</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
             <span class="k">return</span> <span class="n">found_dags</span>
 
         <span class="n">mods</span> <span class="o">=</span> <span class="p">[]</span>
@@ -409,11 +436,11 @@
             <span class="k">if</span> <span class="n">safe_mode</span> <span class="ow">and</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">isfile</span><span class="p">(</span><span class="n">filepath</span><span class="p">):</span>
                 <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filepath</span><span class="p">,</span> <span class="s1">&#39;rb&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
                     <span class="n">content</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
-                    <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="n">b</span><span class="s1">&#39;DAG&#39;</span><span class="p">,</span> <span class="n">b</span><span class="s1">&#39;airflow&#39;</span><span class="p">)]):</span>
+                    <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="sa">b</span><span class="s1">&#39;DAG&#39;</span><span class="p">,</span> <span class="sa">b</span><span class="s1">&#39;airflow&#39;</span><span class="p">)]):</span>
                         <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
                         <span class="k">return</span> <span class="n">found_dags</span>
 
-            <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Importing </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">filepath</span><span class="p">))</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Importing </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
             <span class="n">org_mod_name</span><span class="p">,</span> <span class="n">_</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="n">filepath</span><span class="p">)[</span><span class="o">-</span><span class="mi">1</span><span class="p">])</span>
             <span class="n">mod_name</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;unusual_prefix_&#39;</span> <span class="o">+</span>
                         <span class="n">hashlib</span><span class="o">.</span><span class="n">sha1</span><span class="p">(</span><span class="n">filepath</span><span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()</span> <span class="o">+</span>
@@ -427,7 +454,7 @@
                     <span class="n">m</span> <span class="o">=</span> <span class="n">imp</span><span class="o">.</span><span class="n">load_source</span><span class="p">(</span><span class="n">mod_name</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
                     <span class="n">mods</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">m</span><span class="p">)</span>
                 <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
-                    <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: &quot;</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
+                    <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
                     <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
                     <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
 
@@ -438,15 +465,12 @@
                 <span class="n">mod_name</span><span class="p">,</span> <span class="n">ext</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">splitext</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">)</span>
                 <span class="k">if</span> <span class="ow">not</span> <span class="n">head</span> <span class="ow">and</span> <span class="p">(</span><span class="n">ext</span> <span class="o">==</span> <span class="s1">&#39;.py&#39;</span> <span class="ow">or</span> <span class="n">ext</span> <span class="o">==</span> <span class="s1">&#39;.pyc&#39;</span><span class="p">):</span>
                     <span class="k">if</span> <span class="n">mod_name</span> <span class="o">==</span> <span class="s1">&#39;__init__&#39;</span><span class="p">:</span>
-                        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Found __init__.</span><span class="si">{0}</span><span class="s2"> at root of </span><span class="si">{1}</span><span class="s2">&quot;</span><span class="o">.</span>
-                                            <span class="nb">format</span><span class="p">(</span><span class="n">ext</span><span class="p">,</span> <span class="n">filepath</span><span class="p">))</span>
-
+                        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Found __init__.</span><span class="si">%s</span><span class="s2"> at root of </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">ext</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
                     <span class="k">if</span> <span class="n">safe_mode</span><span class="p">:</span>
                         <span class="k">with</span> <span class="n">zip_file</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">)</span> <span class="k">as</span> <span class="n">zf</span><span class="p">:</span>
-                            <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Reading </span><span class="si">{}</span><span class="s2"> from </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span>
-                                              <span class="nb">format</span><span class="p">(</span><span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">,</span> <span class="n">filepath</span><span class="p">))</span>
+                            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Reading </span><span class="si">%s</span><span class="s2"> from </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">mod</span><span class="o">.</span><span class="n">filename</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
                             <span class="n">content</span> <span class="o">=</span> <span class="n">zf</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
-                            <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="n">b</span><span class="s1">&#39;DAG&#39;</span><span class="p">,</span> <span class="n">b</span><span class="s1">&#39;airflow&#39;</span><span class="p">)]):</span>
+                            <span class="k">if</span> <span class="ow">not</span> <span class="nb">all</span><span class="p">([</span><span class="n">s</span> <span class="ow">in</span> <span class="n">content</span> <span class="k">for</span> <span class="n">s</span> <span class="ow">in</span> <span class="p">(</span><span class="sa">b</span><span class="s1">&#39;DAG&#39;</span><span class="p">,</span> <span class="sa">b</span><span class="s1">&#39;airflow&#39;</span><span class="p">)]):</span>
                                 <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
                                     <span class="n">file_last_changed_on_disk</span><span class="p">)</span>
                                 <span class="c1"># todo: create ignore list</span>
@@ -460,12 +484,12 @@
                         <span class="n">m</span> <span class="o">=</span> <span class="n">importlib</span><span class="o">.</span><span class="n">import_module</span><span class="p">(</span><span class="n">mod_name</span><span class="p">)</span>
                         <span class="n">mods</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">m</span><span class="p">)</span>
                     <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
-                        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: &quot;</span> <span class="o">+</span> <span class="n">filepath</span><span class="p">)</span>
+                        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to import: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span>
                         <span class="bp">self</span><span class="o">.</span><span class="n">import_errors</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
                         <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
 
         <span class="k">for</span> <span class="n">m</span> <span class="ow">in</span> <span class="n">mods</span><span class="p">:</span>
-            <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="n">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
+            <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">m</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
                 <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span>
                     <span class="k">if</span> <span class="ow">not</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
                         <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">=</span> <span class="n">filepath</span>
@@ -477,19 +501,17 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">file_last_changed</span><span class="p">[</span><span class="n">filepath</span><span class="p">]</span> <span class="o">=</span> <span class="n">file_last_changed_on_disk</span>
         <span class="k">return</span> <span class="n">found_dags</span></div>
 
-    <span class="nd">@provide_session</span>
-<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a>    <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
+<div class="viewcode-block" id="DagBag.kill_zombies"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.kill_zombies">[docs]</a>    <span class="nd">@provide_session</span>
+    <span class="k">def</span> <span class="nf">kill_zombies</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">        Fails tasks that haven&#39;t had a heartbeat in too long</span>
 <span class="sd">        &quot;&quot;&quot;</span>
         <span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">LocalTaskJob</span> <span class="k">as</span> <span class="n">LJ</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Finding &#39;running&#39; jobs without a recent heartbeat&quot;</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Finding &#39;running&#39; jobs without a recent heartbeat&quot;</span><span class="p">)</span>
         <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
-        <span class="n">secs</span> <span class="o">=</span> <span class="p">(</span>
-            <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="s1">&#39;scheduler&#39;</span><span class="p">,</span> <span class="s1">&#39;scheduler_zombie_task_threshold&#39;</span><span class="p">))</span>
-        <span class="n">limit_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">secs</span><span class="p">)</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
-            <span class="s2">&quot;Failing jobs without heartbeat after </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">limit_dttm</span><span class="p">))</span>
+        <span class="n">secs</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="s1">&#39;scheduler&#39;</span><span class="p">,</span> <span class="s1">&#39;scheduler_zombie_task_threshold&#39;</span><span class="p">)</span>
+        <span class="n">limit_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">secs</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Failing jobs without heartbeat after </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">limit_dttm</span><span class="p">)</span>
 
         <span class="n">tis</span> <span class="o">=</span> <span class="p">(</span>
             <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span>
@@ -509,9 +531,8 @@
                 <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">task_ids</span><span class="p">:</span>
                     <span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>
                     <span class="n">ti</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task</span>
-                    <span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">&quot;</span><span class="si">{}</span><span class="s2"> killed as zombie&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
-                    <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
-                        <span class="s1">&#39;Marked zombie job </span><span class="si">{}</span><span class="s1"> as failed&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="p">))</span>
+                    <span class="n">ti</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="s2">&quot;</span><span class="si">{}</span><span class="s2"> killed as zombie&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="n">ti</span><span class="p">)))</span>
+                    <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Marked zombie job </span><span class="si">%s</span><span class="s1"> as failed&#39;</span><span class="p">,</span> <span class="n">ti</span><span class="p">)</span>
                     <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;zombies_killed&#39;</span><span class="p">)</span>
         <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
 
@@ -521,7 +542,7 @@
 <span class="sd">        &quot;&quot;&quot;</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">[</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">dag</span>
         <span class="n">dag</span><span class="o">.</span><span class="n">resolve_template_files</span><span class="p">()</span>
-        <span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
+        <span class="n">dag</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
 
         <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
             <span class="n">settings</span><span class="o">.</span><span class="n">policy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span>
@@ -531,7 +552,7 @@
             <span class="n">subdag</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="n">dag</span>
             <span class="n">subdag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="kc">True</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">bag_dag</span><span class="p">(</span><span class="n">subdag</span><span class="p">,</span> <span class="n">parent_dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> <span class="n">root_dag</span><span class="o">=</span><span class="n">root_dag</span><span class="p">)</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">logger</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Loaded DAG </span><span class="si">{dag}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span></div>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Loaded DAG </span><span class="si">{dag}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="o">**</span><span class="nb">locals</span><span class="p">()))</span></div>
 
 <div class="viewcode-block" id="DagBag.collect_dags"><a class="viewcode-back" href="../../code.html#airflow.models.DagBag.collect_dags">[docs]</a>    <span class="k">def</span> <span class="nf">collect_dags</span><span class="p">(</span>
             <span class="bp">self</span><span class="p">,</span>
@@ -546,7 +567,7 @@
 <span class="sd">        ignoring files that match any of the regex patterns specified</span>
 <span class="sd">        in the file.</span>
 <span class="sd">        &quot;&quot;&quot;</span>
-        <span class="n">start_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
+        <span class="n">start_dttm</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
         <span class="n">dag_folder</span> <span class="o">=</span> <span class="n">dag_folder</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_folder</span>
 
         <span class="c1"># Used to store stats around DagBag processing</span>
@@ -574,11 +595,11 @@
                             <span class="k">continue</span>
                         <span class="k">if</span> <span class="ow">not</span> <span class="nb">any</span><span class="p">(</span>
                                 <span class="p">[</span><span class="n">re</span><span class="o">.</span><span class="n">findall</span><span class="p">(</span><span class="n">p</span><span class="p">,</span> <span class="n">filepath</span><span class="p">)</span> <span class="k">for</span> <span class="n">p</span> <span class="ow">in</span> <span class="n">patterns</span><span class="p">]):</span>
-                            <span class="n">ts</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
+                            <span class="n">ts</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
                             <span class="n">found_dags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_file</span><span class="p">(</span>
                                 <span class="n">filepath</span><span class="p">,</span> <span class="n">only_if_updated</span><span class="o">=</span><span class="n">only_if_updated</span><span class="p">)</span>
 
-                            <span class="n">td</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">ts</span>
+                            <span class="n">td</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">ts</span>
                             <span class="n">td</span> <span class="o">=</span> <span class="n">td</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">+</span> <span class="p">(</span>
                                 <span class="nb">float</span><span class="p">(</span><span class="n">td</span><span class="o">.</span><span class="n">microseconds</span><span class="p">)</span> <span class="o">/</span> <span class="mi">1000000</span><span class="p">)</span>
                             <span class="n">stats</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">FileLoadStat</span><span class="p">(</span>
@@ -589,9 +610,9 @@
                                 <span class="nb">str</span><span class="p">([</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">found_dags</span><span class="p">]),</span>
                             <span class="p">))</span>
                     <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
-                        <span class="n">logging</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
+                        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
         <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span>
-            <span class="s1">&#39;collect_dags&#39;</span><span class="p">,</span> <span class="p">(</span><span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_dttm</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(),</span> <span class="mi">1</span><span class="p">)</span>
+            <span class="s1">&#39;collect_dags&#39;</span><span class="p">,</span> <span class="p">(</span><span class="n">datetime</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">start_dttm</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">(),</span> <span class="mi">1</span><span class="p">)</span>
         <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span>
             <span class="s1">&#39;dagbag_size&#39;</span><span class="p">,</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dags</span><span class="p">),</span> <span class="mi">1</span><span class="p">)</span>
         <span class="n">Stats</span><span class="o">.</span><span class="n">gauge</span><span class="p">(</span>
@@ -632,7 +653,7 @@
     <span class="k">def</span> <span class="nf">paused_dags</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
         <span class="n">dag_ids</span> <span class="o">=</span> <span class="p">[</span><span class="n">dp</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">dp</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
-            <span class="n">DagModel</span><span class="o">.</span><span class="n">is_paused</span><span class="o">.</span><span class="n">is_</span><span class="p">(</span><span class="kc">True</span><span class="p">))]</span>
+            <span class="n">DagModel</span><span class="o">.</span><span class="n">is_paused</span><span class="o">.</span><span class="fm">__eq__</span><span class="p">(</span><span class="kc">True</span><span class="p">))]</span>
         <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
         <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
         <span class="k">return</span> <span class="n">dag_ids</span></div>
@@ -656,7 +677,7 @@
         <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">superuser</span>
 
 
-<div class="viewcode-block" id="Connection"><a class="viewcode-back" href="../../code.html#airflow.models.Connection">[docs]</a><span class="k">class</span> <span class="nc">Connection</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
+<div class="viewcode-block" id="Connection"><a class="viewcode-back" href="../../code.html#airflow.models.Connection">[docs]</a><span class="k">class</span> <span class="nc">Connection</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Placeholder to store information about different database instances</span>
 <span class="sd">    connection information. The idea here is that scripts use references to</span>
@@ -678,6 +699,7 @@
     <span class="n">_extra</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">&#39;extra&#39;</span><span class="p">,</span> <span class="n">String</span><span class="p">(</span><span class="mi">5000</span><span class="p">))</span>
 
     <span class="n">_types</span> <span class="o">=</span> <span class="p">[</span>
+        <span class="p">(</span><span class="s1">&#39;docker&#39;</span><span class="p">,</span> <span class="s1">&#39;Docker Registry&#39;</span><span class="p">,),</span>
         <span class="p">(</span><span class="s1">&#39;fs&#39;</span><span class="p">,</span> <span class="s1">&#39;File (path)&#39;</span><span class="p">),</span>
         <span class="p">(</span><span class="s1">&#39;ftp&#39;</span><span class="p">,</span> <span class="s1">&#39;FTP&#39;</span><span class="p">,),</span>
         <span class="p">(</span><span class="s1">&#39;google_cloud_platform&#39;</span><span class="p">,</span> <span class="s1">&#39;Google Cloud Platform&#39;</span><span class="p">),</span>
@@ -700,6 +722,11 @@
         <span class="p">(</span><span class="s1">&#39;mssql&#39;</span><span class="p">,</span> <span class="s1">&#39;Microsoft SQL Server&#39;</span><span class="p">),</span>
         <span class="p">(</span><span class="s1">&#39;mesos_framework-id&#39;</span><span class="p">,</span> <span class="s1">&#39;Mesos Framework ID&#39;</span><span class="p">),</span>
         <span class="p">(</span><span class="s1">&#39;jira&#39;</span><span class="p">,</span> <span class="s1">&#39;JIRA&#39;</span><span class="p">,),</span>
+        <span class="p">(</span><span class="s1">&#39;redis&#39;</span><span class="p">,</span> <span class="s1">&#39;Redis&#39;</span><span class="p">,),</span>
+        <span class="p">(</span><span class="s1">&#39;wasb&#39;</span><span class="p">,</span> <span class="s1">&#39;Azure Blob Storage&#39;</span><span class="p">),</span>
+        <span class="p">(</span><span class="s1">&#39;databricks&#39;</span><span class="p">,</span> <span class="s1">&#39;Databricks&#39;</span><span class="p">,),</span>
+        <span class="p">(</span><span class="s1">&#39;aws&#39;</span><span class="p">,</span> <span class="s1">&#39;Amazon Web Services&#39;</span><span class="p">,),</span>
+        <span class="p">(</span><span class="s1">&#39;emr&#39;</span><span class="p">,</span> <span class="s1">&#39;Elastic MapReduce&#39;</span><span class="p">,),</span>
     <span class="p">]</span>
 
     <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
@@ -736,51 +763,61 @@
 
     <span class="k">def</span> <span class="nf">get_password</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span><span class="p">:</span>
-            <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
+            <span class="k">try</span><span class="p">:</span>
+                <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span>
+            <span class="k">except</span><span class="p">:</span>
                 <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
                     <span class="s2">&quot;Can&#39;t decrypt encrypted password for login=</span><span class="si">{}</span><span class="s2">, </span><span class="se">\</span>
 <span class="s2">                    FERNET_KEY configuration is missing&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
-            <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
+            <span class="k">return</span> <span class="n">fernet</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_password</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
         <span class="k">else</span><span class="p">:</span>
             <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_password</span>
 
     <span class="k">def</span> <span class="nf">set_password</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
         <span class="k">if</span> <span class="n">value</span><span class="p">:</span>
             <span class="k">try</span><span class="p">:</span>
-                <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
+                <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">fernet</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span> <span class="o">=</span> <span class="kc">True</span>
-            <span class="k">except</span> <span class="ne">NameError</span><span class="p">:</span>
+            <span class="k">except</span> <span class="n">AirflowException</span><span class="p">:</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to load fernet while encrypting value, &quot;</span>
+                                    <span class="s2">&quot;using non-encrypted value.&quot;</span><span class="p">)</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">_password</span> <span class="o">=</span> <span class="n">value</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">is_encrypted</span> <span class="o">=</span> <span class="kc">False</span>
 
     <span class="nd">@declared_attr</span>
-    <span class="k">def</span> <span class="nf">password</span><span class="p">(</span><span class="n">cls</span><span class="p">):</span>
+    <span class="k">def</span> <span class="nf">password</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
         <span class="k">return</span> <span class="n">synonym</span><span class="p">(</span><span class="s1">&#39;_password&#39;</span><span class="p">,</span>
-                       <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="n">cls</span><span class="o">.</span><span class="n">get_password</span><span class="p">,</span> <span class="n">cls</span><span class="o">.</span><span class="n">set_password</span><span class="p">))</span>
+                       <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">get_password</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="n">set_password</span><span class="p">))</span>
 
     <span class="k">def</span> <span class="nf">get_extra</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span><span class="p">:</span>
-            <span class="k">if</span> <span class="ow">not</span> <span class="n">ENCRYPTION_ON</span><span class="p">:</span>
+            <span class="k">try</span><span class="p">:</span>
+                <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span>
+            <span class="k">except</span><span class="p">:</span>
                 <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
                     <span class="s2">&quot;Can&#39;t decrypt `extra` params for login=</span><span class="si">{}</span><span class="s2">,</span><span class="se">\</span>
 <span class="s2">                    FERNET_KEY configuration is missing&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">login</span><span class="p">))</span>
-            <span class="k">return</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
+            <span class="k">return</span> <span class="n">fernet</span><span class="o">.</span><span class="n">decrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_extra</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
         <span class="k">else</span><span class="p">:</span>
             <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span>
 
     <span class="k">def</span> <span class="nf">set_extra</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
         <span class="k">if</span> <span class="n">value</span><span class="p">:</span>
             <span class="k">try</span><span class="p">:</span>
-                <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">FERNET</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
+                <span class="n">fernet</span> <span class="o">=</span> <span class="n">get_fernet</span><span class="p">()</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">fernet</span><span class="o">.</span><span class="n">encrypt</span><span class="p">(</span><span class="nb">bytes</span><span class="p">(</span><span class="n">value</span><span class="p">,</span> <span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">decode</span><span class="p">()</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span> <span class="o">=</span> <span class="kc">True</span>
-            <span class="k">except</span> <span class="ne">NameError</span><span class="p">:</span>
+            <span class="k">except</span> <span class="n">AirflowException</span><span class="p">:</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s2">&quot;Failed to load fernet while encrypting value, &quot;</span>
+                                    <span class="s2">&quot;using non-encrypted value.&quot;</span><span class="p">)</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">_extra</span> <span class="o">=</span> <span class="n">value</span>
                 <span class="bp">self</span><span class="o">.</span><span class="n">is_extra_encrypted</span> <span class="o">=</span> <span class="kc">False</span>
 
     <span class="nd">@declared_attr</span>
-    <span class="k">def</span> <span class="nf">extra</span><span class="p">(</span><span class="n">cls</span><span class="p">):</span>
+    <span class="k">def</span> <span class="nf">extra</span><span class="p">(</span><span class="bp">cls</span><span class="p">):</span>
         <span class="k">return</span> <span class="n">synonym</span><span class="p">(</span><span class="s1">&#39;_extra&#39;</span><span class="p">,</span>
-                       <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="n">cls</span><span class="o">.</span><span class="n">get_extra</span><span class="p">,</span> <span class="n">cls</span><span class="o">.</span><span class="n">set_extra</span><span class="p">))</span>
+                       <span class="n">descriptor</span><span class="o">=</span><span class="nb">property</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">get_extra</span><span class="p">,</span> <span class="bp">cls</span><span class="o">.</span><span class="n">set_extra</span><span class="p">))</span>
 
     <span class="k">def</span> <span class="nf">get_hook</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="k">try</span><span class="p">:</span>
@@ -823,6 +860,15 @@
             <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">&#39;jira&#39;</span><span class="p">:</span>
                 <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.jira_hook</span> <span class="k">import</span> <span class="n">JiraHook</span>
                 <span class="k">return</span> <span class="n">JiraHook</span><span class="p">(</span><span class="n">jira_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
+            <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">&#39;redis&#39;</span><span class="p">:</span>
+                <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.redis_hook</span> <span class="k">import</span> <span class="n">RedisHook</span>
+                <span class="k">return</span> <span class="n">RedisHook</span><span class="p">(</span><span class="n">redis_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
+            <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">&#39;wasb&#39;</span><span class="p">:</span>
+                <span class="kn">from</span> <span class="nn">airflow.contrib.hooks.wasb_hook</span> <span class="k">import</span> <span class="n">WasbHook</span>
+                <span class="k">return</span> <span class="n">WasbHook</span><span class="p">(</span><span class="n">wasb_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
+            <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_type</span> <span class="o">==</span> <span class="s1">&#39;docker&#39;</span><span class="p">:</span>
+                <span class="kn">from</span> <span class="nn">airflow.hooks.docker_hook</span> <span class="k">import</span> <span class="n">DockerHook</span>
+                <span class="k">return</span> <span class="n">DockerHook</span><span class="p">(</span><span class="n">docker_conn_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
         <span class="k">except</span><span class="p">:</span>
             <span class="k">pass</span>
 
@@ -837,8 +883,8 @@
             <span class="k">try</span><span class="p">:</span>
                 <span class="n">obj</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">extra</span><span class="p">)</span>
             <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
-                <span class="n">logging</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
-                <span class="n">logging</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Failed parsing the json for conn_id </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Failed parsing the json for conn_id </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">conn_id</span><span class="p">)</span>
 
         <span class="k">return</span> <span class="n">obj</span></div>
 
@@ -869,13 +915,13 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">pickle</span> <span class="o">=</span> <span class="n">dag</span>
 
 
-<div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
+<div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Task instances store the state of a task instance. This table is the</span>
 <span class="sd">    authority and single source of truth around what tasks have run and the</span>
 <span class="sd">    state they are in.</span>
 
-<span class="sd">    The SqlAchemy model doesn&#39;t have a SqlAlchemy foreign key to the task or</span>
+<span class="sd">    The SqlAlchemy model doesn&#39;t have a SqlAlchemy foreign key to the task or</span>
 <span class="sd">    dag model deliberately to have more control over transactions.</span>
 
 <span class="sd">    Database transactions on this table should insure double triggers and</span>
@@ -892,7 +938,8 @@
     <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">DateTime</span><span class="p">)</span>
     <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span>
     <span class="n">state</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">20</span><span class="p">))</span>
-    <span class="n">try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span>
+    <span class="n">_try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">&#39;try_number&#39;</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span>
+    <span class="n">max_tries</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span>
     <span class="n">hostname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span>
     <span class="n">unixname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span>
     <span class="n">job_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span>
@@ -919,18 +966,42 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">priority_weight_total</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="mi">0</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">unixname</span> <span class="o">=</span> <span class="n">getpass</span><span class="o">.</span><span class="n">getuser</span><span class="p">()</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">run_as_user</span>
         <span class="k">if</span> <span class="n">state</span><span class="p">:</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">init_on_load</span><span class="p">()</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">&quot;airflow.task&quot;</span><span class="p">)</span>
 
-    <span class="nd">@reconstructor</span>
-<div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.init_on_load">[docs]</a>    <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+<div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.init_on_load">[docs]</a>    <span class="nd">@reconstructor</span>
+    <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
         <span class="sd">&quot;&quot;&quot; Initialize the attributes that aren&#39;t stored in the DB. &quot;&quot;&quot;</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="kc">False</span>  <span class="c1"># can be changed when calling &#39;run&#39;</span></div>
 
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">        Return the try number that this task number will be when it is acutally</span>
+<span class="sd">        run.</span>
+
+<span class="sd">        If the TI is currently running, this will match the column in the</span>
+<span class="sd">        databse, in all othercases this will be incremenetd</span>
+<span class="sd">        &quot;&quot;&quot;</span>
+        <span class="c1"># This is designed so that task logs end up in the right file.</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span>
+            <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span>
+
+    <span class="nd">@try_number</span><span class="o">.</span><span class="n">setter</span>
+    <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">=</span> <span class="n">value</span>
+
+    <span class="nd">@property</span>
+    <span class="k">def</span> <span class="nf">next_try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span>
+
 <div class="viewcode-block" id="TaskInstance.command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.command">[docs]</a>    <span class="k">def</span> <span class="nf">command</span><span class="p">(</span>
             <span class="bp">self</span><span class="p">,</span>
             <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
@@ -1007,8 +1078,8 @@
             <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
             <span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">)</span></div>
 
-    <span class="nd">@staticmethod</span>
-<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a>    <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span>
+<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../code.html#airflow.models.TaskInstance.generate_command">[docs]</a>    <span class="nd">@staticmethod</span>
+    <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">

<TRUNCATED>