You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/03 17:48:12 UTC

[12/35] incubator-airflow-site git commit: 1.9.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/python_operator.html
----------------------------------------------------------------------
diff --git a/_modules/python_operator.html b/_modules/python_operator.html
index d2def55..5e5cdc6 100644
--- a/_modules/python_operator.html
+++ b/_modules/python_operator.html
@@ -13,6 +13,8 @@
 
   
   
+  
+  
 
   
 
@@ -80,7 +82,10 @@
           
             
             
-                <ul>
+              
+            
+            
+              <ul>
 <li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li>
@@ -177,13 +182,20 @@
 <span class="c1"># limitations under the License.</span>
 
 <span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">str</span>
-<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span>
-<span class="kn">import</span> <span class="nn">logging</span>
-
-<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span>
-<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
+<span class="kn">import</span> <span class="nn">dill</span>
+<span class="kn">import</span> <span class="nn">inspect</span>
+<span class="kn">import</span> <span class="nn">os</span>
+<span class="kn">import</span> <span class="nn">pickle</span>
+<span class="kn">import</span> <span class="nn">subprocess</span>
+<span class="kn">import</span> <span class="nn">sys</span>
+<span class="kn">import</span> <span class="nn">types</span>
+
+<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span>
+<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">SkipMixin</span>
 <span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
-<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.file</span> <span class="k">import</span> <span class="n">TemporaryDirectory</span>
+
+<span class="kn">from</span> <span class="nn">textwrap</span> <span class="k">import</span> <span class="n">dedent</span>
 
 
 <div class="viewcode-block" id="PythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.PythonOperator">[docs]</a><span class="k">class</span> <span class="nc">PythonOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -226,7 +238,9 @@
             <span class="n">templates_dict</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="n">templates_exts</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
             <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
-        <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+        <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+        <span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">python_callable</span><span class="p">):</span>
+            <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">&#39;`python_callable` param must be callable&#39;</span><span class="p">)</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span> <span class="o">=</span> <span class="n">python_callable</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">op_args</span> <span class="o">=</span> <span class="n">op_args</span> <span class="ow">or</span> <span class="p">[]</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">op_kwargs</span> <span class="ow">or</span> <span class="p">{}</span>
@@ -241,12 +255,15 @@
             <span class="n">context</span><span class="p">[</span><span class="s1">&#39;templates_dict&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">templates_dict</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">context</span>
 
-        <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span>
-        <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done. Returned value was: &quot;</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">return_value</span><span class="p">))</span>
-        <span class="k">return</span> <span class="n">return_value</span></div>
+        <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execute_callable</span><span class="p">()</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done. Returned value was: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">return_value</span><span class="p">)</span>
+        <span class="k">return</span> <span class="n">return_value</span>
+
+    <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span></div>
 
 
-<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Allows a workflow to &quot;branch&quot; or follow a single path following the</span>
 <span class="sd">    execution of this task.</span>
@@ -267,23 +284,20 @@
 <span class="sd">    &quot;&quot;&quot;</span>
     <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
         <span class="n">branch</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">BranchPythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
-        <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Following branch &quot;</span> <span class="o">+</span> <span class="n">branch</span><span class="p">)</span>
-        <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Marking other directly downstream tasks as skipped&quot;</span><span class="p">)</span>
-        <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
-        <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span>
-            <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">:</span>
-                <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span>
-                    <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
-                <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
-        <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
-        <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
-        <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done.&quot;</span><span class="p">)</span></div>
-
-
-<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Following branch </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">branch</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Marking other directly downstream tasks as skipped&quot;</span><span class="p">)</span>
+
+        <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Downstream task_ids </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+        <span class="n">skip_tasks</span> <span class="o">=</span> <span class="p">[</span><span class="n">t</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">downstream_tasks</span> <span class="k">if</span> <span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">]</span>
+        <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">&#39;dag_run&#39;</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">skip_tasks</span><span class="p">)</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done.&quot;</span><span class="p">)</span></div>
+
+
+<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span>
     <span class="sd">&quot;&quot;&quot;</span>
 <span class="sd">    Allows a workflow to continue only if a condition is met. Otherwise, the</span>
 <span class="sd">    workflow &quot;short-circuits&quot; and downstream tasks are skipped.</span>
@@ -297,23 +311,220 @@
 <span class="sd">    &quot;&quot;&quot;</span>
     <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
         <span class="n">condition</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">ShortCircuitOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
-        <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Condition result is </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">condition</span><span class="p">))</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Condition result is </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">condition</span><span class="p">)</span>
+
         <span class="k">if</span> <span class="n">condition</span><span class="p">:</span>
-            <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Proceeding with downstream tasks...&#39;</span><span class="p">)</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Proceeding with downstream tasks...&#39;</span><span class="p">)</span>
             <span class="k">return</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Skipping downstream tasks...&#39;</span><span class="p">)</span>
+
+        <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Downstream task_ids </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+        <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">&#39;dag_run&#39;</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done.&quot;</span><span class="p">)</span></div>
+
+<span class="k">class</span> <span class="nc">PythonVirtualenvOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+    <span class="sd">&quot;&quot;&quot;</span>
+<span class="sd">    Allows one to run a function in a virtualenv that is created and destroyed</span>
+<span class="sd">    automatically (with certain caveats).</span>
+
+<span class="sd">    The function must be defined using def, and not be part of a class. All imports</span>
+<span class="sd">    must happen inside the function and no variables outside of the scope may be referenced.</span>
+<span class="sd">    A global scope variable named virtualenv_string_args will be available (populated by</span>
+<span class="sd">    string_args). In addition, one can pass stuff through op_args and op_kwargs, and one</span>
+<span class="sd">    can use a return value.</span>
+
+<span class="sd">    Note that if your virtualenv runs in a different Python major version than Airflow,</span>
+<span class="sd">    you cannot use return values, op_args, or op_kwargs. You can use string_args though.</span>
+
+<span class="sd">    :param python_callable: A python function with no references to outside variables,</span>
+<span class="sd">        defined with def, which will be run in a virtualenv</span>
+<span class="sd">    :type python_callable: function</span>
+<span class="sd">    :param requirements: A list of requirements as specified in a pip install command</span>
+<span class="sd">    :type requirements: list(str)</span>
+<span class="sd">    :param python_version: The Python version to run the virtualenv with. Note that</span>
+<span class="sd">        both 2 and 2.7 are acceptable forms.</span>
+<span class="sd">    :type python_version: str</span>
+<span class="sd">    :param use_dill: Whether to use dill to serialize the args and result (pickle is default).</span>
+<span class="sd">        This allow more complex types but requires you to include dill in your requirements.</span>
+<span class="sd">    :type use_dill: bool</span>
+<span class="sd">    :param system_site_packages: Whether to include system_site_packages in your virtualenv.</span>
+<span class="sd">        See virtualenv documentation for more information.</span>
+<span class="sd">    :type system_site_packages: bool</span>
+<span class="sd">    :param op_args: A list of positional arguments to pass to python_callable.</span>
+<span class="sd">    :type op_kwargs: list</span>
+<span class="sd">    :param op_kwargs: A dict of keyword arguments to pass to python_callable.</span>
+<span class="sd">    :type op_kwargs: dict</span>
+<span class="sd">    :param string_args: Strings that are present in the global var virtualenv_string_args,</span>
+<span class="sd">        available to python_callable at runtime as a list(str). Note that args are split</span>
+<span class="sd">        by newline.</span>
+<span class="sd">    :type string_args: list(str)</span>
+
+<span class="sd">    &quot;&quot;&quot;</span>
+    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">python_callable</span><span class="p">,</span> <span class="n">requirements</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">python_version</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_dill</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
+                 <span class="n">system_site_packages</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">op_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">op_kwargs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">string_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+                 <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
+        <span class="nb">super</span><span class="p">(</span><span class="n">PythonVirtualenvOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
+            <span class="n">python_callable</span><span class="o">=</span><span class="n">python_callable</span><span class="p">,</span>
+            <span class="n">op_args</span><span class="o">=</span><span class="n">op_args</span><span class="p">,</span>
+            <span class="n">op_kwargs</span><span class="o">=</span><span class="n">op_kwargs</span><span class="p">,</span>
+            <span class="o">*</span><span class="n">args</span><span class="p">,</span>
+            <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span> <span class="o">=</span> <span class="n">requirements</span> <span class="ow">or</span> <span class="p">[]</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span> <span class="o">=</span> <span class="n">string_args</span> <span class="ow">or</span> <span class="p">[]</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="o">=</span> <span class="n">python_version</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span> <span class="o">=</span> <span class="n">use_dill</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span> <span class="o">=</span> <span class="n">system_site_packages</span>
+        <span class="c1"># check that dill is present if needed</span>
+        <span class="n">dill_in_requirements</span> <span class="o">=</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">&#39;dill&#39;</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span>
+        <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">system_site_packages</span><span class="p">)</span> <span class="ow">and</span> <span class="n">use_dill</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">any</span><span class="p">(</span><span class="n">dill_in_requirements</span><span class="p">):</span>
+            <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">&#39;If using dill, dill must be in the environment &#39;</span> <span class="o">+</span>
+                                   <span class="s1">&#39;either via system_site_packages or requirements&#39;</span><span class="p">)</span>
+        <span class="c1"># check that a function is passed, and that it is not a lambda</span>
+        <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">FunctionType</span><span class="p">)</span>
+                <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">==</span> <span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">):</span>
+            <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">&#39;</span><span class="si">{}</span><span class="s1"> only supports functions for python_callable arg&#39;</span><span class="p">,</span>
+                                   <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">)</span>
+        <span class="c1"># check that args are passed iff python major version matches</span>
+        <span class="k">if</span> <span class="p">(</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
+                <span class="ow">and</span> <span class="nb">str</span><span class="p">(</span><span class="n">python_version</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="nb">str</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">version_info</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
+                <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">()):</span>
+            <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;Passing op_args or op_kwargs is not supported across &quot;</span>
+                                   <span class="s2">&quot;different Python major versions &quot;</span>
+                                   <span class="s2">&quot;for PythonVirtualenvOperator. Please use string_args.&quot;</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">with</span> <span class="n">TemporaryDirectory</span><span class="p">(</span><span class="n">prefix</span><span class="o">=</span><span class="s1">&#39;venv&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">tmp_dir</span><span class="p">:</span>
+            <span class="c1"># generate filenames</span>
+            <span class="n">input_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">&#39;script.in&#39;</span><span class="p">)</span>
+            <span class="n">output_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">&#39;script.out&#39;</span><span class="p">)</span>
+            <span class="n">string_args_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">&#39;string_args.txt&#39;</span><span class="p">)</span>
+            <span class="n">script_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">&#39;script.py&#39;</span><span class="p">)</span>
+
+            <span class="c1"># set up virtualenv</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_generate_virtualenv_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">))</span>
+            <span class="n">cmd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_pip_install_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">)</span>
+            <span class="k">if</span> <span class="n">cmd</span><span class="p">:</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="n">cmd</span><span class="p">)</span>
+
+            <span class="bp">self</span><span class="o">.</span><span class="n">_write_args</span><span class="p">(</span><span class="n">input_filename</span><span class="p">)</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">_write_script</span><span class="p">(</span><span class="n">script_filename</span><span class="p">)</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">_write_string_args</span><span class="p">(</span><span class="n">string_args_filename</span><span class="p">)</span>
+
+            <span class="c1"># execute command in virtualenv</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span>
+                                          <span class="n">script_filename</span><span class="p">,</span>
+                                          <span class="n">input_filename</span><span class="p">,</span>
+                                          <span class="n">output_filename</span><span class="p">,</span>
+                                          <span class="n">string_args_filename</span><span class="p">))</span>
+            <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_result</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">_pass_op_args</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="c1"># we should only pass op_args if any are given to us</span>
+        <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">0</span>
+
+    <span class="k">def</span> <span class="nf">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">cmd</span><span class="p">):</span>
+        <span class="k">try</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Executing cmd</span><span class="se">\n</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">cmd</span><span class="p">))</span>
+            <span class="n">output</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">check_output</span><span class="p">(</span><span class="n">cmd</span><span class="p">,</span> <span class="n">stderr</span><span class="o">=</span><span class="n">subprocess</span><span class="o">.</span><span class="n">STDOUT</span><span class="p">)</span>
+            <span class="k">if</span> <span class="n">output</span><span class="p">:</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Got output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">output</span><span class="p">))</span>
+        <span class="k">except</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">CalledProcessError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Got error output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">output</span><span class="p">))</span>
+            <span class="k">raise</span>
+
+    <span class="k">def</span> <span class="nf">_write_string_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">filename</span><span class="p">):</span>
+        <span class="c1"># writes string_args to a file, which are read line by line</span>
+        <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filename</span><span class="p">,</span> <span class="s1">&#39;w&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+            <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span><span class="p">)))</span>
+
+    <span class="k">def</span> <span class="nf">_write_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">):</span>
+        <span class="c1"># serialize args to file</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span>
+            <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">input_filename</span><span class="p">,</span> <span class="s1">&#39;wb&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+                <span class="n">arg_dict</span> <span class="o">=</span> <span class="p">({</span><span class="s1">&#39;args&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="s1">&#39;kwargs&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">})</span>
+                <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+                    <span class="n">dill</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span>
+                <span class="k">else</span><span class="p">:</span>
+                    <span class="n">pickle</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">_read_result</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">):</span>
+        <span class="k">if</span> <span class="n">os</span><span class="o">.</span><span class="n">stat</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span><span class="o">.</span><span class="n">st_size</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+            <span class="k">return</span> <span class="kc">None</span>
+        <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">output_filename</span><span class="p">,</span> <span class="s1">&#39;rb&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+            <span class="k">try</span><span class="p">:</span>
+                <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+                    <span class="k">return</span> <span class="n">dill</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
+                <span class="k">else</span><span class="p">:</span>
+                    <span class="k">return</span> <span class="n">pickle</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
+            <span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
+                <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Error deserializing result. Note that result deserialization &quot;</span>
+                              <span class="s2">&quot;is not supported across major Python versions.&quot;</span><span class="p">)</span>
+                <span class="k">raise</span>
+
+    <span class="k">def</span> <span class="nf">_write_script</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">):</span>
+        <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">script_filename</span><span class="p">,</span> <span class="s1">&#39;w&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+            <span class="n">python_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_code</span><span class="p">()</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Writing code to file</span><span class="se">\n</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">python_code</span><span class="p">))</span>
+            <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">python_code</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">_generate_virtualenv_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span>
+        <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;virtualenv&#39;</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">]</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span><span class="p">:</span>
+            <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--system-site-packages&#39;</span><span class="p">)</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
+            <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">&#39;--python=python</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_version</span><span class="p">))</span>
+        <span class="k">return</span> <span class="n">cmd</span>
+
+    <span class="k">def</span> <span class="nf">_generate_pip_install_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span>
+        <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+            <span class="k">return</span> <span class="p">[]</span>
+        <span class="k">else</span><span class="p">:</span>
+            <span class="c1"># direct path alleviates need to activate</span>
+            <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;</span><span class="si">{}</span><span class="s1">/bin/pip&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="s1">&#39;install&#39;</span><span class="p">]</span>
+            <span class="k">return</span> <span class="n">cmd</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span>
+
+    <span class="k">def</span> <span class="nf">_generate_python_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">):</span>
+        <span class="c1"># direct path alleviates need to activate</span>
+        <span class="k">return</span> <span class="p">[</span><span class="s1">&#39;</span><span class="si">{}</span><span class="s1">/bin/python&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">]</span>
+
+    <span class="k">def</span> <span class="nf">_generate_python_code</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+            <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">&#39;dill&#39;</span>
+        <span class="k">else</span><span class="p">:</span>
+            <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">&#39;pickle&#39;</span>
+        <span class="n">fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span>
+        <span class="c1"># dont try to read pickle if we didnt pass anything</span>
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span>
+            <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">&#39;with open(sys.argv[1], &quot;rb&quot;) as f: arg_dict = </span><span class="si">{}</span><span class="s1">.load(f)&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pickling_library</span><span class="p">)</span>
         <span class="k">else</span><span class="p">:</span>
-            <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Skipping downstream tasks...&#39;</span><span class="p">)</span>
-            <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
-            <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;task&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span>
-                <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span>
-                    <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
-                <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
-                <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
-            <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
-            <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done.&quot;</span><span class="p">)</span></div>
+            <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">&#39;arg_dict = {&quot;args&quot;: [], &quot;kwargs&quot;: </span><span class="si">{}</span><span class="s1">}&#39;</span>
+
+        <span class="c1"># no indents in original code so we can accept any type of indents in the original function</span>
+        <span class="c1"># we deserialize args, call function, serialize result if necessary</span>
+        <span class="k">return</span> <span class="n">dedent</span><span class="p">(</span><span class="s2">&quot;&quot;&quot;</span><span class="se">\</span>
+<span class="s2">        import </span><span class="si">{pickling_library}</span><span class="s2"></span>
+<span class="s2">        import sys</span>
+<span class="s2">        </span><span class="si">{load_args_code}</span><span class="s2"></span>
+<span class="s2">        args = arg_dict[&quot;args&quot;]</span>
+<span class="s2">        kwargs = arg_dict[&quot;kwargs&quot;]</span>
+<span class="s2">        with open(sys.argv[3], &#39;r&#39;) as f: virtualenv_string_args = list(map(lambda x: x.strip(), list(f)))</span>
+<span class="s2">        </span><span class="si">{python_callable_lines}</span><span class="s2"></span>
+<span class="s2">        res = </span><span class="si">{python_callable_name}</span><span class="s2">(*args, **kwargs)</span>
+<span class="s2">        with open(sys.argv[2], &#39;wb&#39;) as f: res is not None and </span><span class="si">{pickling_library}</span><span class="s2">.dump(res, f)</span>
+<span class="s2">        &quot;&quot;&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
+                <span class="n">load_args_code</span><span class="o">=</span><span class="n">load_args_line</span><span class="p">,</span>
+                <span class="n">python_callable_lines</span><span class="o">=</span><span class="n">dedent</span><span class="p">(</span><span class="n">inspect</span><span class="o">.</span><span class="n">getsource</span><span class="p">(</span><span class="n">fn</span><span class="p">)),</span>
+                <span class="n">python_callable_name</span><span class="o">=</span><span class="n">fn</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
+                <span class="n">pickling_library</span><span class="o">=</span><span class="n">pickling_library</span><span class="p">)</span>
+
+        <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Done.&quot;</span><span class="p">)</span>
+
 </pre></div>
 
            </div>

http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/qubole_operator.html
----------------------------------------------------------------------
diff --git a/_modules/qubole_operator.html b/_modules/qubole_operator.html
index fb33047..c6fdd28 100644
--- a/_modules/qubole_operator.html
+++ b/_modules/qubole_operator.html
@@ -13,6 +13,8 @@
 
   
   
+  
+  
 
   
 
@@ -30,6 +32,9 @@
   
 
   
+        <link rel="index" title="Index"
+              href="../genindex.html"/>
+        <link rel="search" title="Search" href="../search.html"/>
     <link rel="top" title="Airflow Documentation" href="../index.html"/>
         <link rel="up" title="Module code" href="index.html"/> 
 
@@ -40,6 +45,7 @@
 
 <body class="wy-body-for-nav" role="document">
 
+   
   <div class="wy-grid-for-nav">
 
     
@@ -76,7 +82,10 @@
           
             
             
-                <ul>
+              
+            
+            
+              <ul>
 <li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li>
@@ -90,6 +99,8 @@
 <li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling &amp; Triggers</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li>
 <li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li>
 </ul>
@@ -104,8 +115,10 @@
 
       
       <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
-        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
-        <a href="../index.html">Airflow</a>
+        
+          <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+          <a href="../index.html">Airflow</a>
+        
       </nav>
 
 
@@ -118,19 +131,36 @@
 
 
 
+
+
+
+
+
+
+
+
+
+
 <div role="navigation" aria-label="breadcrumbs navigation">
+
   <ul class="wy-breadcrumbs">
-    <li><a href="../index.html">Docs</a> &raquo;</li>
-      
+    
+      <li><a href="../index.html">Docs</a> &raquo;</li>
+        
           <li><a href="index.html">Module code</a> &raquo;</li>
-      
-    <li>qubole_operator</li>
+        
+      <li>qubole_operator</li>
+    
+    
       <li class="wy-breadcrumbs-aside">
         
-          
+            
         
       </li>
+    
   </ul>
+
+  
   <hr/>
 </div>
           <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
@@ -151,9 +181,9 @@
 <span class="c1"># See the License for the specific language governing permissions and</span>
 <span class="c1"># limitations under the License.</span>
 
-<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">BaseOperator</span>
-<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span>
-<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="kn">import</span> <span class="n">QuboleHook</span>
+<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
+<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="k">import</span> <span class="n">QuboleHook</span>
 
 
 <div class="viewcode-block" id="QuboleOperator"><a class="viewcode-back" href="../code.html#airflow.contrib.operators.QuboleOperator">[docs]</a><span class="k">class</span> <span class="nc">QuboleOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -168,6 +198,7 @@
 <span class="sd">        :tags: array of tags to be assigned with the command</span>
 <span class="sd">        :cluster_label: cluster label on which the command will be executed</span>
 <span class="sd">        :name: name to be given to command</span>
+<span class="sd">        :notify: whether to send email on command completion or not (default is False)</span>
 
 <span class="sd">        **Arguments specific to command types**</span>
 
@@ -181,20 +212,28 @@
 <span class="sd">            :script_location: s3 location containing query statement</span>
 <span class="sd">            :macros: macro values which were used in query</span>
 <span class="sd">        hadoopcmd:</span>
-<span class="sd">            :sub_commnad: must be one these [&quot;jar&quot;, &quot;s3distcp&quot;, &quot;streaming&quot;] followed by 1 or more args</span>
+<span class="sd">            :sub_commnad: must be one these [&quot;jar&quot;, &quot;s3distcp&quot;, &quot;streaming&quot;] followed by</span>
+<span class="sd">                1 or more args</span>
 <span class="sd">        shellcmd:</span>
 <span class="sd">            :script: inline command with args</span>
 <span class="sd">            :script_location: s3 location containing query statement</span>
-<span class="sd">            :files: list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed.</span>
-<span class="sd">            :archives: list of archives in s3 bucket as archive1,archive2 format. These will be unarchived intothe working directory where the qubole command is being executed</span>
-<span class="sd">            :parameters: any extra args which need to be passed to script (only when script_location is supplied)</span>
+<span class="sd">            :files: list of files in s3 bucket as file1,file2 format. These files will be</span>
+<span class="sd">                copied into the working directory where the qubole command is being</span>
+<span class="sd">                executed.</span>
+<span class="sd">            :archives: list of archives in s3 bucket as archive1,archive2 format. These</span>
+<span class="sd">                will be unarchived intothe working directory where the qubole command is</span>
+<span class="sd">                being executed</span>
+<span class="sd">            :parameters: any extra args which need to be passed to script (only when</span>
+<span class="sd">                script_location is supplied)</span>
 <span class="sd">        pigcmd:</span>
 <span class="sd">            :script: inline query statement (latin_statements)</span>
 <span class="sd">            :script_location: s3 location containing pig query</span>
-<span class="sd">            :parameters: any extra args which need to be passed to script (only when script_location is supplied</span>
+<span class="sd">            :parameters: any extra args which need to be passed to script (only when</span>
+<span class="sd">                script_location is supplied</span>
 <span class="sd">        sparkcmd:</span>
 <span class="sd">            :program: the complete Spark Program in Scala, SQL, Command, R, or Python</span>
-<span class="sd">            :cmdline: spark-submit command line, all required information must be specify in cmdline itself.</span>
+<span class="sd">            :cmdline: spark-submit command line, all required information must be specify</span>
+<span class="sd">                in cmdline itself.</span>
 <span class="sd">            :sql: inline sql query</span>
 <span class="sd">            :script_location: s3 location containing query statement</span>
 <span class="sd">            :language: language of the program, Scala, SQL, Command, R, or Python</span>
@@ -215,7 +254,7 @@
 <span class="sd">            :db_update_mode: allowinsert or updateonly</span>
 <span class="sd">            :db_update_keys: columns used to determine the uniqueness of rows</span>
 <span class="sd">            :export_dir: HDFS/S3 location from which data will be exported.</span>
-<span class="sd">            :fields_terminated_by: hex of the char used as column separator in the dataset.</span>
+<span class="sd">            :fields_terminated_by: hex of the char used as column separator in the dataset</span>
 <span class="sd">        dbimportcmd:</span>
 <span class="sd">            :mode: 1 (simple), 2 (advance)</span>
 <span class="sd">            :hive_table: Name of the hive table</span>
@@ -223,17 +262,32 @@
 <span class="sd">            :db_table: name of the db table</span>
 <span class="sd">            :where_clause: where clause, if any</span>
 <span class="sd">            :parallelism: number of parallel db connections to use for extracting data</span>
-<span class="sd">            :extract_query: SQL query to extract data from db. $CONDITIONS must be part of the where clause.</span>
+<span class="sd">            :extract_query: SQL query to extract data from db. $CONDITIONS must be part</span>
+<span class="sd">                of the where clause.</span>
 <span class="sd">            :boundary_query: Query to be used get range of row IDs to be extracted</span>
 <span class="sd">            :split_column: Column used as row ID to split data into ranges (mode 2)</span>
 
-<span class="sd">    .. note:: Following fields are template-supported : ``query``, ``script_location``, ``sub_command``, ``script``, ``files``,</span>
-<span class="sd">        ``archives``, ``program``, ``cmdline``, ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``, ``tags``,</span>
-<span class="sd">        ``name``, ``parameters``. You can also use ``.txt`` files for template driven use cases.</span>
+<span class="sd">    .. note:: Following fields are template-supported : ``query``, ``script_location``,</span>
+<span class="sd">        ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,</span>
+<span class="sd">        ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,</span>
+<span class="sd">        ``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,</span>
+<span class="sd">        ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,</span>
+<span class="sd">        ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.</span>
+<span class="sd">         You can also use ``.txt`` files for template driven use cases.</span>
+
+<span class="sd">    .. note:: In QuboleOperator there is a default handler for task failures and retries,</span>
+<span class="sd">        which generally kills the command running at QDS for the corresponding task</span>
+<span class="sd">        instance. You can override this behavior by providing your own failure and retry</span>
+<span class="sd">        handler in task definition.</span>
 <span class="sd">    &quot;&quot;&quot;</span>
 
-    <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;query&#39;</span><span class="p">,</span> <span class="s1">&#39;script_location&#39;</span><span class="p">,</span> <span class="s1">&#39;sub_command&#39;</span><span class="p">,</span> <span class="s1">&#39;script&#39;</span><span class="p">,</span> <span class="s1">&#39;files&#39;</span><span class="p">,</span> <span class="s1">&#39;archives&#39;</span><span class="p">,</span> <span class="s1">&#39;program&#39;</span><span class="p">,</span> <span class="s1">&#39;cmdline&#39;</span><span class="p">,</span>
-                       <span class="s1">&#39;sql&#39;</span><span class="p">,</span> <span class="s1">&#39;where_clause&#39;</span><span class="p">,</span> <span class="s1">&#39;extract_query&#39;</span><span class="p">,</span> <span class="s1">&#39;boundary_query&#39;</span><span class="p">,</span> <span class="s1">&#39;macros&#39;</span><span class="p">,</span> <span class="s1">&#39;tags&#39;</span><span class="p">,</span> <span class="s1">&#39;name&#39;</span><span class="p">,</span> <span class="s1">&#39;parameters&#39;</span><span class="p">)</span>
+    <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;query&#39;</span><span class="p">,</span> <span class="s1">&#39;script_location&#39;</span><span class="p">,</span> <span class="s1">&#39;sub_command&#39;</span><span class="p">,</span> <span class="s1">&#39;script&#39;</span><span class="p">,</span> <span class="s1">&#39;files&#39;</span><span class="p">,</span>
+                       <span class="s1">&#39;archives&#39;</span><span class="p">,</span> <span class="s1">&#39;program&#39;</span><span class="p">,</span> <span class="s1">&#39;cmdline&#39;</span><span class="p">,</span> <span class="s1">&#39;sql&#39;</span><span class="p">,</span> <span class="s1">&#39;where_clause&#39;</span><span class="p">,</span> <span class="s1">&#39;tags&#39;</span><span class="p">,</span>
+                       <span class="s1">&#39;extract_query&#39;</span><span class="p">,</span> <span class="s1">&#39;boundary_query&#39;</span><span class="p">,</span> <span class="s1">&#39;macros&#39;</span><span class="p">,</span> <span class="s1">&#39;name&#39;</span><span class="p">,</span> <span class="s1">&#39;parameters&#39;</span><span class="p">,</span>
+                       <span class="s1">&#39;dbtap_id&#39;</span><span class="p">,</span> <span class="s1">&#39;hive_table&#39;</span><span class="p">,</span> <span class="s1">&#39;db_table&#39;</span><span class="p">,</span> <span class="s1">&#39;split_column&#39;</span><span class="p">,</span> <span class="s1">&#39;note_id&#39;</span><span class="p">,</span>
+                       <span class="s1">&#39;db_update_keys&#39;</span><span class="p">,</span> <span class="s1">&#39;export_dir&#39;</span><span class="p">,</span> <span class="s1">&#39;partition_spec&#39;</span><span class="p">,</span> <span class="s1">&#39;qubole_conn_id&#39;</span><span class="p">,</span>
+                       <span class="s1">&#39;arguments&#39;</span><span class="p">,</span> <span class="s1">&#39;user_program_arguments&#39;</span><span class="p">)</span>
+
     <span class="n">template_ext</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;.txt&#39;</span><span class="p">,)</span>
     <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">&#39;#3064A1&#39;</span>
     <span class="n">ui_fgcolor</span> <span class="o">=</span> <span class="s1">&#39;#fff&#39;</span>
@@ -243,25 +297,32 @@
         <span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
         <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;qubole_conn_id&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">qubole_conn_id</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
-        <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+        <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span>
+
+        <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
+            <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span>
 
     <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
-        <span class="c1"># Reinitiating the hook, as some template fields might have changed</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
-        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
 
-    <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
-        <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+    <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
+        <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
 
-    <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="bp">True</span><span class="p">):</span>
-        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span>
+    <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span>
 
     <span class="k">def</span> <span class="nf">get_log</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
-        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
 
     <span class="k">def</span> <span class="nf">get_jobs_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
-        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+        <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+
+    <span class="k">def</span> <span class="nf">get_hook</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+        <span class="c1"># Reinitiating the hook, as some template fields might have changed</span>
+        <span class="k">return</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
 
     <span class="k">def</span> <span class="nf">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span>
         <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
@@ -270,19 +331,19 @@
             <span class="k">else</span><span class="p">:</span>
                 <span class="k">return</span> <span class="s1">&#39;&#39;</span>
         <span class="k">else</span><span class="p">:</span>
-            <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="n">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
+            <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
 
     <span class="k">def</span> <span class="nf">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
         <span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
             <span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
         <span class="k">else</span><span class="p">:</span>
-            <span class="nb">object</span><span class="o">.</span><span class="n">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div>
-
-
-
+            <span class="nb">object</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div>
 </pre></div>
 
            </div>
+           <div class="articleComments">
+            
+           </div>
           </div>
           <footer>
   
@@ -315,7 +376,8 @@
             VERSION:'',
             COLLAPSE_INDEX:false,
             FILE_SUFFIX:'.html',
-            HAS_SOURCE:  true
+            HAS_SOURCE:  true,
+            SOURCELINK_SUFFIX: '.txt'
         };
     </script>
       <script type="text/javascript" src="../_static/jquery.js"></script>