You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/03 17:48:12 UTC
[12/35] incubator-airflow-site git commit: 1.9.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/python_operator.html
----------------------------------------------------------------------
diff --git a/_modules/python_operator.html b/_modules/python_operator.html
index d2def55..5e5cdc6 100644
--- a/_modules/python_operator.html
+++ b/_modules/python_operator.html
@@ -13,6 +13,8 @@
+
+
@@ -80,7 +82,10 @@
- <ul>
+
+
+
+ <ul>
<li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li>
@@ -177,13 +182,20 @@
<span class="c1"># limitations under the License.</span>
<span class="kn">from</span> <span class="nn">builtins</span> <span class="k">import</span> <span class="nb">str</span>
-<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">datetime</span>
-<span class="kn">import</span> <span class="nn">logging</span>
-
-<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">TaskInstance</span>
-<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
+<span class="kn">import</span> <span class="nn">dill</span>
+<span class="kn">import</span> <span class="nn">inspect</span>
+<span class="kn">import</span> <span class="nn">os</span>
+<span class="kn">import</span> <span class="nn">pickle</span>
+<span class="kn">import</span> <span class="nn">subprocess</span>
+<span class="kn">import</span> <span class="nn">sys</span>
+<span class="kn">import</span> <span class="nn">types</span>
+
+<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span>
+<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span><span class="p">,</span> <span class="n">SkipMixin</span>
<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
-<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.file</span> <span class="k">import</span> <span class="n">TemporaryDirectory</span>
+
+<span class="kn">from</span> <span class="nn">textwrap</span> <span class="k">import</span> <span class="n">dedent</span>
<div class="viewcode-block" id="PythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.PythonOperator">[docs]</a><span class="k">class</span> <span class="nc">PythonOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -226,7 +238,9 @@
<span class="n">templates_dict</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">templates_exts</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
- <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+ <span class="nb">super</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+ <span class="k">if</span> <span class="ow">not</span> <span class="n">callable</span><span class="p">(</span><span class="n">python_callable</span><span class="p">):</span>
+ <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'`python_callable` param must be callable'</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span> <span class="o">=</span> <span class="n">python_callable</span>
<span class="bp">self</span><span class="o">.</span><span class="n">op_args</span> <span class="o">=</span> <span class="n">op_args</span> <span class="ow">or</span> <span class="p">[]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">op_kwargs</span> <span class="ow">or</span> <span class="p">{}</span>
@@ -241,12 +255,15 @@
<span class="n">context</span><span class="p">[</span><span class="s1">'templates_dict'</span><span class="p">]</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">templates_dict</span>
<span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span> <span class="o">=</span> <span class="n">context</span>
- <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done. Returned value was: "</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">return_value</span><span class="p">))</span>
- <span class="k">return</span> <span class="n">return_value</span></div>
+ <span class="n">return_value</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execute_callable</span><span class="p">()</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done. Returned value was: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">return_value</span><span class="p">)</span>
+ <span class="k">return</span> <span class="n">return_value</span>
+
+ <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span></div>
-<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+<div class="viewcode-block" id="BranchPythonOperator"><a class="viewcode-back" href="../code.html#airflow.operators.BranchPythonOperator">[docs]</a><span class="k">class</span> <span class="nc">BranchPythonOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Allows a workflow to "branch" or follow a single path following the</span>
<span class="sd"> execution of this task.</span>
@@ -267,23 +284,20 @@
<span class="sd"> """</span>
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="n">branch</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">BranchPythonOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Following branch "</span> <span class="o">+</span> <span class="n">branch</span><span class="p">)</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Marking other directly downstream tasks as skipped"</span><span class="p">)</span>
- <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
- <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span>
- <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">:</span>
- <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span>
- <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
- <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
- <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
- <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div>
-
-
-<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Following branch </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">branch</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Marking other directly downstream tasks as skipped"</span><span class="p">)</span>
+
+ <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Downstream task_ids </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+ <span class="n">skip_tasks</span> <span class="o">=</span> <span class="p">[</span><span class="n">t</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">downstream_tasks</span> <span class="k">if</span> <span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="o">!=</span> <span class="n">branch</span><span class="p">]</span>
+ <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'dag_run'</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">skip_tasks</span><span class="p">)</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div>
+
+
+<div class="viewcode-block" id="ShortCircuitOperator"><a class="viewcode-back" href="../code.html#airflow.operators.ShortCircuitOperator">[docs]</a><span class="k">class</span> <span class="nc">ShortCircuitOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">,</span> <span class="n">SkipMixin</span><span class="p">):</span>
<span class="sd">"""</span>
<span class="sd"> Allows a workflow to continue only if a condition is met. Otherwise, the</span>
<span class="sd"> workflow "short-circuits" and downstream tasks are skipped.</span>
@@ -297,23 +311,220 @@
<span class="sd"> """</span>
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="n">condition</span> <span class="o">=</span> <span class="nb">super</span><span class="p">(</span><span class="n">ShortCircuitOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Condition result is </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">condition</span><span class="p">))</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Condition result is </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">condition</span><span class="p">)</span>
+
<span class="k">if</span> <span class="n">condition</span><span class="p">:</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Proceeding with downstream tasks...'</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Proceeding with downstream tasks...'</span><span class="p">)</span>
<span class="k">return</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Skipping downstream tasks...'</span><span class="p">)</span>
+
+ <span class="n">downstream_tasks</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Downstream task_ids </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+ <span class="k">if</span> <span class="n">downstream_tasks</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">skip</span><span class="p">(</span><span class="n">context</span><span class="p">[</span><span class="s1">'dag_run'</span><span class="p">],</span> <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">downstream_tasks</span><span class="p">)</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div>
+
+<span class="k">class</span> <span class="nc">PythonVirtualenvOperator</span><span class="p">(</span><span class="n">PythonOperator</span><span class="p">):</span>
+ <span class="sd">"""</span>
+<span class="sd"> Allows one to run a function in a virtualenv that is created and destroyed</span>
+<span class="sd"> automatically (with certain caveats).</span>
+
+<span class="sd"> The function must be defined using def, and not be part of a class. All imports</span>
+<span class="sd"> must happen inside the function and no variables outside of the scope may be referenced.</span>
+<span class="sd"> A global scope variable named virtualenv_string_args will be available (populated by</span>
+<span class="sd"> string_args). In addition, one can pass stuff through op_args and op_kwargs, and one</span>
+<span class="sd"> can use a return value.</span>
+
+<span class="sd"> Note that if your virtualenv runs in a different Python major version than Airflow,</span>
+<span class="sd"> you cannot use return values, op_args, or op_kwargs. You can use string_args though.</span>
+
+<span class="sd"> :param python_callable: A python function with no references to outside variables,</span>
+<span class="sd"> defined with def, which will be run in a virtualenv</span>
+<span class="sd"> :type python_callable: function</span>
+<span class="sd"> :param requirements: A list of requirements as specified in a pip install command</span>
+<span class="sd"> :type requirements: list(str)</span>
+<span class="sd"> :param python_version: The Python version to run the virtualenv with. Note that</span>
+<span class="sd"> both 2 and 2.7 are acceptable forms.</span>
+<span class="sd"> :type python_version: str</span>
+<span class="sd"> :param use_dill: Whether to use dill to serialize the args and result (pickle is default).</span>
+<span class="sd"> This allow more complex types but requires you to include dill in your requirements.</span>
+<span class="sd"> :type use_dill: bool</span>
+<span class="sd"> :param system_site_packages: Whether to include system_site_packages in your virtualenv.</span>
+<span class="sd"> See virtualenv documentation for more information.</span>
+<span class="sd"> :type system_site_packages: bool</span>
+<span class="sd"> :param op_args: A list of positional arguments to pass to python_callable.</span>
+<span class="sd"> :type op_kwargs: list</span>
+<span class="sd"> :param op_kwargs: A dict of keyword arguments to pass to python_callable.</span>
+<span class="sd"> :type op_kwargs: dict</span>
+<span class="sd"> :param string_args: Strings that are present in the global var virtualenv_string_args,</span>
+<span class="sd"> available to python_callable at runtime as a list(str). Note that args are split</span>
+<span class="sd"> by newline.</span>
+<span class="sd"> :type string_args: list(str)</span>
+
+<span class="sd"> """</span>
+ <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">python_callable</span><span class="p">,</span> <span class="n">requirements</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">python_version</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">use_dill</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
+ <span class="n">system_site_packages</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">op_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">op_kwargs</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">string_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
+ <span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
+ <span class="nb">super</span><span class="p">(</span><span class="n">PythonVirtualenvOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span>
+ <span class="n">python_callable</span><span class="o">=</span><span class="n">python_callable</span><span class="p">,</span>
+ <span class="n">op_args</span><span class="o">=</span><span class="n">op_args</span><span class="p">,</span>
+ <span class="n">op_kwargs</span><span class="o">=</span><span class="n">op_kwargs</span><span class="p">,</span>
+ <span class="o">*</span><span class="n">args</span><span class="p">,</span>
+ <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span> <span class="o">=</span> <span class="n">requirements</span> <span class="ow">or</span> <span class="p">[]</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span> <span class="o">=</span> <span class="n">string_args</span> <span class="ow">or</span> <span class="p">[]</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="o">=</span> <span class="n">python_version</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span> <span class="o">=</span> <span class="n">use_dill</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span> <span class="o">=</span> <span class="n">system_site_packages</span>
+ <span class="c1"># check that dill is present if needed</span>
+ <span class="n">dill_in_requirements</span> <span class="o">=</span> <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s1">'dill'</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span>
+ <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="n">system_site_packages</span><span class="p">)</span> <span class="ow">and</span> <span class="n">use_dill</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">any</span><span class="p">(</span><span class="n">dill_in_requirements</span><span class="p">):</span>
+ <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'If using dill, dill must be in the environment '</span> <span class="o">+</span>
+ <span class="s1">'either via system_site_packages or requirements'</span><span class="p">)</span>
+ <span class="c1"># check that a function is passed, and that it is not a lambda</span>
+ <span class="k">if</span> <span class="p">(</span><span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="p">,</span> <span class="n">types</span><span class="o">.</span><span class="n">FunctionType</span><span class="p">)</span>
+ <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">==</span> <span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">):</span>
+ <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s1">'</span><span class="si">{}</span><span class="s1"> only supports functions for python_callable arg'</span><span class="p">,</span>
+ <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">)</span>
+ <span class="c1"># check that args are passed iff python major version matches</span>
+ <span class="k">if</span> <span class="p">(</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span>
+ <span class="ow">and</span> <span class="nb">str</span><span class="p">(</span><span class="n">python_version</span><span class="p">)[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="nb">str</span><span class="p">(</span><span class="n">sys</span><span class="o">.</span><span class="n">version_info</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
+ <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">()):</span>
+ <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Passing op_args or op_kwargs is not supported across "</span>
+ <span class="s2">"different Python major versions "</span>
+ <span class="s2">"for PythonVirtualenvOperator. Please use string_args."</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">execute_callable</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">with</span> <span class="n">TemporaryDirectory</span><span class="p">(</span><span class="n">prefix</span><span class="o">=</span><span class="s1">'venv'</span><span class="p">)</span> <span class="k">as</span> <span class="n">tmp_dir</span><span class="p">:</span>
+ <span class="c1"># generate filenames</span>
+ <span class="n">input_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.in'</span><span class="p">)</span>
+ <span class="n">output_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.out'</span><span class="p">)</span>
+ <span class="n">string_args_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'string_args.txt'</span><span class="p">)</span>
+ <span class="n">script_filename</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span> <span class="s1">'script.py'</span><span class="p">)</span>
+
+ <span class="c1"># set up virtualenv</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_generate_virtualenv_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">))</span>
+ <span class="n">cmd</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_pip_install_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">)</span>
+ <span class="k">if</span> <span class="n">cmd</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span><span class="n">cmd</span><span class="p">)</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">_write_args</span><span class="p">(</span><span class="n">input_filename</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_write_script</span><span class="p">(</span><span class="n">script_filename</span><span class="p">)</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_write_string_args</span><span class="p">(</span><span class="n">string_args_filename</span><span class="p">)</span>
+
+ <span class="c1"># execute command in virtualenv</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_execute_in_subprocess</span><span class="p">(</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_cmd</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">,</span>
+ <span class="n">script_filename</span><span class="p">,</span>
+ <span class="n">input_filename</span><span class="p">,</span>
+ <span class="n">output_filename</span><span class="p">,</span>
+ <span class="n">string_args_filename</span><span class="p">))</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_read_result</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">_pass_op_args</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="c1"># we should only pass op_args if any are given to us</span>
+ <span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">)</span> <span class="o">+</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">)</span> <span class="o">></span> <span class="mi">0</span>
+
+ <span class="k">def</span> <span class="nf">_execute_in_subprocess</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">cmd</span><span class="p">):</span>
+ <span class="k">try</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Executing cmd</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">cmd</span><span class="p">))</span>
+ <span class="n">output</span> <span class="o">=</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">check_output</span><span class="p">(</span><span class="n">cmd</span><span class="p">,</span> <span class="n">stderr</span><span class="o">=</span><span class="n">subprocess</span><span class="o">.</span><span class="n">STDOUT</span><span class="p">)</span>
+ <span class="k">if</span> <span class="n">output</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Got output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">output</span><span class="p">))</span>
+ <span class="k">except</span> <span class="n">subprocess</span><span class="o">.</span><span class="n">CalledProcessError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Got error output</span><span class="se">\n</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">output</span><span class="p">))</span>
+ <span class="k">raise</span>
+
+ <span class="k">def</span> <span class="nf">_write_string_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">filename</span><span class="p">):</span>
+ <span class="c1"># writes string_args to a file, which are read line by line</span>
+ <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filename</span><span class="p">,</span> <span class="s1">'w'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+ <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="nb">str</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">string_args</span><span class="p">)))</span>
+
+ <span class="k">def</span> <span class="nf">_write_args</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">):</span>
+ <span class="c1"># serialize args to file</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span>
+ <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">input_filename</span><span class="p">,</span> <span class="s1">'wb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+ <span class="n">arg_dict</span> <span class="o">=</span> <span class="p">({</span><span class="s1">'args'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_args</span><span class="p">,</span> <span class="s1">'kwargs'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">op_kwargs</span><span class="p">})</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+ <span class="n">dill</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span>
+ <span class="k">else</span><span class="p">:</span>
+ <span class="n">pickle</span><span class="o">.</span><span class="n">dump</span><span class="p">(</span><span class="n">arg_dict</span><span class="p">,</span> <span class="n">f</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">_read_result</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">):</span>
+ <span class="k">if</span> <span class="n">os</span><span class="o">.</span><span class="n">stat</span><span class="p">(</span><span class="n">output_filename</span><span class="p">)</span><span class="o">.</span><span class="n">st_size</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+ <span class="k">return</span> <span class="kc">None</span>
+ <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">output_filename</span><span class="p">,</span> <span class="s1">'rb'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+ <span class="k">try</span><span class="p">:</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+ <span class="k">return</span> <span class="n">dill</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
+ <span class="k">else</span><span class="p">:</span>
+ <span class="k">return</span> <span class="n">pickle</span><span class="o">.</span><span class="n">load</span><span class="p">(</span><span class="n">f</span><span class="p">)</span>
+ <span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Error deserializing result. Note that result deserialization "</span>
+ <span class="s2">"is not supported across major Python versions."</span><span class="p">)</span>
+ <span class="k">raise</span>
+
+ <span class="k">def</span> <span class="nf">_write_script</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">):</span>
+ <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">script_filename</span><span class="p">,</span> <span class="s1">'w'</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
+ <span class="n">python_code</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_generate_python_code</span><span class="p">()</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Writing code to file</span><span class="se">\n</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">python_code</span><span class="p">))</span>
+ <span class="n">f</span><span class="o">.</span><span class="n">write</span><span class="p">(</span><span class="n">python_code</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">_generate_virtualenv_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span>
+ <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'virtualenv'</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">]</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">system_site_packages</span><span class="p">:</span>
+ <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--system-site-packages'</span><span class="p">)</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_version</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
+ <span class="n">cmd</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="s1">'--python=python</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">python_version</span><span class="p">))</span>
+ <span class="k">return</span> <span class="n">cmd</span>
+
+ <span class="k">def</span> <span class="nf">_generate_pip_install_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">):</span>
+ <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">requirements</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+ <span class="k">return</span> <span class="p">[]</span>
+ <span class="k">else</span><span class="p">:</span>
+ <span class="c1"># direct path alleviates need to activate</span>
+ <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'</span><span class="si">{}</span><span class="s1">/bin/pip'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="s1">'install'</span><span class="p">]</span>
+ <span class="k">return</span> <span class="n">cmd</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">requirements</span>
+
+ <span class="k">def</span> <span class="nf">_generate_python_cmd</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tmp_dir</span><span class="p">,</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">):</span>
+ <span class="c1"># direct path alleviates need to activate</span>
+ <span class="k">return</span> <span class="p">[</span><span class="s1">'</span><span class="si">{}</span><span class="s1">/bin/python'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">tmp_dir</span><span class="p">),</span> <span class="n">script_filename</span><span class="p">,</span> <span class="n">input_filename</span><span class="p">,</span> <span class="n">output_filename</span><span class="p">,</span> <span class="n">string_args_filename</span><span class="p">]</span>
+
+ <span class="k">def</span> <span class="nf">_generate_python_code</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">use_dill</span><span class="p">:</span>
+ <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">'dill'</span>
+ <span class="k">else</span><span class="p">:</span>
+ <span class="n">pickling_library</span> <span class="o">=</span> <span class="s1">'pickle'</span>
+ <span class="n">fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">python_callable</span>
+ <span class="c1"># dont try to read pickle if we didnt pass anything</span>
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pass_op_args</span><span class="p">():</span>
+ <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">'with open(sys.argv[1], "rb") as f: arg_dict = </span><span class="si">{}</span><span class="s1">.load(f)'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">pickling_library</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Skipping downstream tasks...'</span><span class="p">)</span>
- <span class="n">session</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">Session</span><span class="p">()</span>
- <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task'</span><span class="p">]</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">:</span>
- <span class="n">ti</span> <span class="o">=</span> <span class="n">TaskInstance</span><span class="p">(</span>
- <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
- <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
- <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
- <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
- <span class="n">session</span><span class="o">.</span><span class="n">close</span><span class="p">()</span>
- <span class="n">logging</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span></div>
+ <span class="n">load_args_line</span> <span class="o">=</span> <span class="s1">'arg_dict = {"args": [], "kwargs": </span><span class="si">{}</span><span class="s1">}'</span>
+
+ <span class="c1"># no indents in original code so we can accept any type of indents in the original function</span>
+ <span class="c1"># we deserialize args, call function, serialize result if necessary</span>
+ <span class="k">return</span> <span class="n">dedent</span><span class="p">(</span><span class="s2">"""</span><span class="se">\</span>
+<span class="s2"> import </span><span class="si">{pickling_library}</span><span class="s2"></span>
+<span class="s2"> import sys</span>
+<span class="s2"> </span><span class="si">{load_args_code}</span><span class="s2"></span>
+<span class="s2"> args = arg_dict["args"]</span>
+<span class="s2"> kwargs = arg_dict["kwargs"]</span>
+<span class="s2"> with open(sys.argv[3], 'r') as f: virtualenv_string_args = list(map(lambda x: x.strip(), list(f)))</span>
+<span class="s2"> </span><span class="si">{python_callable_lines}</span><span class="s2"></span>
+<span class="s2"> res = </span><span class="si">{python_callable_name}</span><span class="s2">(*args, **kwargs)</span>
+<span class="s2"> with open(sys.argv[2], 'wb') as f: res is not None and </span><span class="si">{pickling_library}</span><span class="s2">.dump(res, f)</span>
+<span class="s2"> """</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
+ <span class="n">load_args_code</span><span class="o">=</span><span class="n">load_args_line</span><span class="p">,</span>
+ <span class="n">python_callable_lines</span><span class="o">=</span><span class="n">dedent</span><span class="p">(</span><span class="n">inspect</span><span class="o">.</span><span class="n">getsource</span><span class="p">(</span><span class="n">fn</span><span class="p">)),</span>
+ <span class="n">python_callable_name</span><span class="o">=</span><span class="n">fn</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span>
+ <span class="n">pickling_library</span><span class="o">=</span><span class="n">pickling_library</span><span class="p">)</span>
+
+ <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Done."</span><span class="p">)</span>
+
</pre></div>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/28a3eb60/_modules/qubole_operator.html
----------------------------------------------------------------------
diff --git a/_modules/qubole_operator.html b/_modules/qubole_operator.html
index fb33047..c6fdd28 100644
--- a/_modules/qubole_operator.html
+++ b/_modules/qubole_operator.html
@@ -13,6 +13,8 @@
+
+
@@ -30,6 +32,9 @@
+ <link rel="index" title="Index"
+ href="../genindex.html"/>
+ <link rel="search" title="Search" href="../search.html"/>
<link rel="top" title="Airflow Documentation" href="../index.html"/>
<link rel="up" title="Module code" href="index.html"/>
@@ -40,6 +45,7 @@
<body class="wy-body-for-nav" role="document">
+
<div class="wy-grid-for-nav">
@@ -76,7 +82,10 @@
- <ul>
+
+
+
+ <ul>
<li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../start.html">Quick Start</a></li>
@@ -90,6 +99,8 @@
<li class="toctree-l1"><a class="reference internal" href="../scheduler.html">Scheduling & Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../security.html">Security</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../api.html">Experimental Rest API</a></li>
+<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../code.html">API Reference</a></li>
</ul>
@@ -104,8 +115,10 @@
<nav class="wy-nav-top" role="navigation" aria-label="top navigation">
- <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
- <a href="../index.html">Airflow</a>
+
+ <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+ <a href="../index.html">Airflow</a>
+
</nav>
@@ -118,19 +131,36 @@
+
+
+
+
+
+
+
+
+
+
<div role="navigation" aria-label="breadcrumbs navigation">
+
<ul class="wy-breadcrumbs">
- <li><a href="../index.html">Docs</a> »</li>
-
+
+ <li><a href="../index.html">Docs</a> »</li>
+
<li><a href="index.html">Module code</a> »</li>
-
- <li>qubole_operator</li>
+
+ <li>qubole_operator</li>
+
+
<li class="wy-breadcrumbs-aside">
-
+
</li>
+
</ul>
+
+
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
@@ -151,9 +181,9 @@
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
-<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">BaseOperator</span>
-<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="kn">import</span> <span class="n">apply_defaults</span>
-<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="kn">import</span> <span class="n">QuboleHook</span>
+<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">BaseOperator</span>
+<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
+<span class="kn">from</span> <span class="nn">airflow.contrib.hooks.qubole_hook</span> <span class="k">import</span> <span class="n">QuboleHook</span>
<div class="viewcode-block" id="QuboleOperator"><a class="viewcode-back" href="../code.html#airflow.contrib.operators.QuboleOperator">[docs]</a><span class="k">class</span> <span class="nc">QuboleOperator</span><span class="p">(</span><span class="n">BaseOperator</span><span class="p">):</span>
@@ -168,6 +198,7 @@
<span class="sd"> :tags: array of tags to be assigned with the command</span>
<span class="sd"> :cluster_label: cluster label on which the command will be executed</span>
<span class="sd"> :name: name to be given to command</span>
+<span class="sd"> :notify: whether to send email on command completion or not (default is False)</span>
<span class="sd"> **Arguments specific to command types**</span>
@@ -181,20 +212,28 @@
<span class="sd"> :script_location: s3 location containing query statement</span>
<span class="sd"> :macros: macro values which were used in query</span>
<span class="sd"> hadoopcmd:</span>
-<span class="sd"> :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by 1 or more args</span>
+<span class="sd"> :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by</span>
+<span class="sd"> 1 or more args</span>
<span class="sd"> shellcmd:</span>
<span class="sd"> :script: inline command with args</span>
<span class="sd"> :script_location: s3 location containing query statement</span>
-<span class="sd"> :files: list of files in s3 bucket as file1,file2 format. These files will be copied into the working directory where the qubole command is being executed.</span>
-<span class="sd"> :archives: list of archives in s3 bucket as archive1,archive2 format. These will be unarchived intothe working directory where the qubole command is being executed</span>
-<span class="sd"> :parameters: any extra args which need to be passed to script (only when script_location is supplied)</span>
+<span class="sd"> :files: list of files in s3 bucket as file1,file2 format. These files will be</span>
+<span class="sd"> copied into the working directory where the qubole command is being</span>
+<span class="sd"> executed.</span>
+<span class="sd"> :archives: list of archives in s3 bucket as archive1,archive2 format. These</span>
+<span class="sd"> will be unarchived intothe working directory where the qubole command is</span>
+<span class="sd"> being executed</span>
+<span class="sd"> :parameters: any extra args which need to be passed to script (only when</span>
+<span class="sd"> script_location is supplied)</span>
<span class="sd"> pigcmd:</span>
<span class="sd"> :script: inline query statement (latin_statements)</span>
<span class="sd"> :script_location: s3 location containing pig query</span>
-<span class="sd"> :parameters: any extra args which need to be passed to script (only when script_location is supplied</span>
+<span class="sd"> :parameters: any extra args which need to be passed to script (only when</span>
+<span class="sd"> script_location is supplied</span>
<span class="sd"> sparkcmd:</span>
<span class="sd"> :program: the complete Spark Program in Scala, SQL, Command, R, or Python</span>
-<span class="sd"> :cmdline: spark-submit command line, all required information must be specify in cmdline itself.</span>
+<span class="sd"> :cmdline: spark-submit command line, all required information must be specify</span>
+<span class="sd"> in cmdline itself.</span>
<span class="sd"> :sql: inline sql query</span>
<span class="sd"> :script_location: s3 location containing query statement</span>
<span class="sd"> :language: language of the program, Scala, SQL, Command, R, or Python</span>
@@ -215,7 +254,7 @@
<span class="sd"> :db_update_mode: allowinsert or updateonly</span>
<span class="sd"> :db_update_keys: columns used to determine the uniqueness of rows</span>
<span class="sd"> :export_dir: HDFS/S3 location from which data will be exported.</span>
-<span class="sd"> :fields_terminated_by: hex of the char used as column separator in the dataset.</span>
+<span class="sd"> :fields_terminated_by: hex of the char used as column separator in the dataset</span>
<span class="sd"> dbimportcmd:</span>
<span class="sd"> :mode: 1 (simple), 2 (advance)</span>
<span class="sd"> :hive_table: Name of the hive table</span>
@@ -223,17 +262,32 @@
<span class="sd"> :db_table: name of the db table</span>
<span class="sd"> :where_clause: where clause, if any</span>
<span class="sd"> :parallelism: number of parallel db connections to use for extracting data</span>
-<span class="sd"> :extract_query: SQL query to extract data from db. $CONDITIONS must be part of the where clause.</span>
+<span class="sd"> :extract_query: SQL query to extract data from db. $CONDITIONS must be part</span>
+<span class="sd"> of the where clause.</span>
<span class="sd"> :boundary_query: Query to be used get range of row IDs to be extracted</span>
<span class="sd"> :split_column: Column used as row ID to split data into ranges (mode 2)</span>
-<span class="sd"> .. note:: Following fields are template-supported : ``query``, ``script_location``, ``sub_command``, ``script``, ``files``,</span>
-<span class="sd"> ``archives``, ``program``, ``cmdline``, ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``, ``tags``,</span>
-<span class="sd"> ``name``, ``parameters``. You can also use ``.txt`` files for template driven use cases.</span>
+<span class="sd"> .. note:: Following fields are template-supported : ``query``, ``script_location``,</span>
+<span class="sd"> ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,</span>
+<span class="sd"> ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,</span>
+<span class="sd"> ``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,</span>
+<span class="sd"> ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,</span>
+<span class="sd"> ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.</span>
+<span class="sd"> You can also use ``.txt`` files for template driven use cases.</span>
+
+<span class="sd"> .. note:: In QuboleOperator there is a default handler for task failures and retries,</span>
+<span class="sd"> which generally kills the command running at QDS for the corresponding task</span>
+<span class="sd"> instance. You can override this behavior by providing your own failure and retry</span>
+<span class="sd"> handler in task definition.</span>
<span class="sd"> """</span>
- <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'query'</span><span class="p">,</span> <span class="s1">'script_location'</span><span class="p">,</span> <span class="s1">'sub_command'</span><span class="p">,</span> <span class="s1">'script'</span><span class="p">,</span> <span class="s1">'files'</span><span class="p">,</span> <span class="s1">'archives'</span><span class="p">,</span> <span class="s1">'program'</span><span class="p">,</span> <span class="s1">'cmdline'</span><span class="p">,</span>
- <span class="s1">'sql'</span><span class="p">,</span> <span class="s1">'where_clause'</span><span class="p">,</span> <span class="s1">'extract_query'</span><span class="p">,</span> <span class="s1">'boundary_query'</span><span class="p">,</span> <span class="s1">'macros'</span><span class="p">,</span> <span class="s1">'tags'</span><span class="p">,</span> <span class="s1">'name'</span><span class="p">,</span> <span class="s1">'parameters'</span><span class="p">)</span>
+ <span class="n">template_fields</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'query'</span><span class="p">,</span> <span class="s1">'script_location'</span><span class="p">,</span> <span class="s1">'sub_command'</span><span class="p">,</span> <span class="s1">'script'</span><span class="p">,</span> <span class="s1">'files'</span><span class="p">,</span>
+ <span class="s1">'archives'</span><span class="p">,</span> <span class="s1">'program'</span><span class="p">,</span> <span class="s1">'cmdline'</span><span class="p">,</span> <span class="s1">'sql'</span><span class="p">,</span> <span class="s1">'where_clause'</span><span class="p">,</span> <span class="s1">'tags'</span><span class="p">,</span>
+ <span class="s1">'extract_query'</span><span class="p">,</span> <span class="s1">'boundary_query'</span><span class="p">,</span> <span class="s1">'macros'</span><span class="p">,</span> <span class="s1">'name'</span><span class="p">,</span> <span class="s1">'parameters'</span><span class="p">,</span>
+ <span class="s1">'dbtap_id'</span><span class="p">,</span> <span class="s1">'hive_table'</span><span class="p">,</span> <span class="s1">'db_table'</span><span class="p">,</span> <span class="s1">'split_column'</span><span class="p">,</span> <span class="s1">'note_id'</span><span class="p">,</span>
+ <span class="s1">'db_update_keys'</span><span class="p">,</span> <span class="s1">'export_dir'</span><span class="p">,</span> <span class="s1">'partition_spec'</span><span class="p">,</span> <span class="s1">'qubole_conn_id'</span><span class="p">,</span>
+ <span class="s1">'arguments'</span><span class="p">,</span> <span class="s1">'user_program_arguments'</span><span class="p">)</span>
+
<span class="n">template_ext</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'.txt'</span><span class="p">,)</span>
<span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#3064A1'</span>
<span class="n">ui_fgcolor</span> <span class="o">=</span> <span class="s1">'#fff'</span>
@@ -243,25 +297,32 @@
<span class="bp">self</span><span class="o">.</span><span class="n">args</span> <span class="o">=</span> <span class="n">args</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span> <span class="o">=</span> <span class="n">kwargs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="s1">'qubole_conn_id'</span><span class="p">]</span> <span class="o">=</span> <span class="n">qubole_conn_id</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
- <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+ <span class="nb">super</span><span class="p">(</span><span class="n">QuboleOperator</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="o">*</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
+
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span>
+
+ <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="o">.</span><span class="n">handle_failure_retry</span>
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
- <span class="c1"># Reinitiating the hook, as some template fields might have changed</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">hook</span> <span class="o">=</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
- <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
- <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
- <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+ <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
+ <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">kill</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
- <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="bp">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="bp">True</span><span class="p">):</span>
- <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span>
+ <span class="k">def</span> <span class="nf">get_results</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fp</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">inline</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">delim</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">fetch</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_results</span><span class="p">(</span><span class="n">ti</span><span class="p">,</span> <span class="n">fp</span><span class="p">,</span> <span class="n">inline</span><span class="p">,</span> <span class="n">delim</span><span class="p">,</span> <span class="n">fetch</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">get_log</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
- <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_log</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">get_jobs_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">ti</span><span class="p">):</span>
- <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">hook</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+ <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_hook</span><span class="p">()</span><span class="o">.</span><span class="n">get_jobs_id</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">get_hook</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
+ <span class="c1"># Reinitiating the hook, as some template fields might have changed</span>
+ <span class="k">return</span> <span class="n">QuboleHook</span><span class="p">(</span><span class="o">*</span><span class="bp">self</span><span class="o">.</span><span class="n">args</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
@@ -270,19 +331,19 @@
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">''</span>
<span class="k">else</span><span class="p">:</span>
- <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="n">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
+ <span class="k">return</span> <span class="nb">object</span><span class="o">.</span><span class="fm">__getattribute__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="k">if</span> <span class="n">name</span> <span class="ow">in</span> <span class="n">QuboleOperator</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">[</span><span class="n">name</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
<span class="k">else</span><span class="p">:</span>
- <span class="nb">object</span><span class="o">.</span><span class="n">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div>
-
-
-
+ <span class="nb">object</span><span class="o">.</span><span class="fm">__setattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">name</span><span class="p">,</span> <span class="n">value</span><span class="p">)</span></div>
</pre></div>
</div>
+ <div class="articleComments">
+
+ </div>
</div>
<footer>
@@ -315,7 +376,8 @@
VERSION:'',
COLLAPSE_INDEX:false,
FILE_SUFFIX:'.html',
- HAS_SOURCE: true
+ HAS_SOURCE: true,
+ SOURCELINK_SUFFIX: '.txt'
};
</script>
<script type="text/javascript" src="../_static/jquery.js"></script>